聊聊reactive streams的Mono及Flux

本文主要讲一下reactive streams的Publisher接口的两个抽象类Mono与Flux

Publisher

reactive-streams-1.0.1-sources.jar!/org/reactivestreams/Publisher.java

/**
 * A {@link Publisher} is a provider of a potentially unbounded number of sequenced elements, publishing them according to
 * the demand received from its {@link Subscriber}(s).
 * <p>
 * A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link #subscribe(Subscriber)} dynamically
 * at various points in time.
 *
 * @param <T> the type of element signaled.
 */
public interface Publisher<T> {

    /**
     * Request {@link Publisher} to start streaming data.
     * <p>
     * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
     * <p>
     * Each {@link Subscription} will work for only a single {@link Subscriber}.
     * <p>
     * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
     * <p>
     * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
     * signal the error via {@link Subscriber#onError}.
     *
     * @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
     */
    public void subscribe(Subscriber<? super T> s);
}

Mono

reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/Mono.java

public abstract class Mono<T> implements Publisher<T> {
    //...
    /**
     * Expose the specified {@link Publisher} with the {@link Mono} API, and ensure it will emit 0 or 1 item.
     * The source emitter will be cancelled on the first `onNext`.
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/from1.png" alt="">
     * <p>
     * @param source the {@link Publisher} source
     * @param <T> the source type
     *
     * @return the next item emitted as a {@link Mono}
     */
    public static <T> Mono<T> from(Publisher<? extends T> source) {
        if (source instanceof Mono) {
            @SuppressWarnings("unchecked")
            Mono<T> casted = (Mono<T>) source;
            return casted;
        }
        if (source instanceof Flux) {
            @SuppressWarnings("unchecked")
            Flux<T> casted = (Flux<T>) source;
            return casted.next();
        }
        return onAssembly(new MonoFromPublisher<>(source));
    }

    /**
     * Create a new {@link Mono} that emits the specified item, which is captured at
     * instantiation time.
     *
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/just.png" alt="">
     * <p>
     * @param data the only item to onNext
     * @param <T> the type of the produced item
     *
     * @return a {@link Mono}.
     */
    public static <T> Mono<T> just(T data) {
        return onAssembly(new MonoJust<>(data));
    }
    //...
}

Flux

reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/Flux.java

public abstract class Flux<T> implements Publisher<T> {
    //......
    /**
     * Programmatically create a {@link Flux} with the capability of emitting multiple
     * elements in a synchronous or asynchronous manner through the {@link FluxSink} API.
     * <p>
     * This Flux factory is useful if one wants to adapt some other multi-valued async API
     * and not worry about cancellation and backpressure (which is handled by buffering
     * all signals if the downstream can't keep up).
     * <p>
     * For example:
     *
     * <pre><code>
     * Flux.&lt;String&gt;create(emitter -&gt; {
     *
     *     ActionListener al = e -&gt; {
     *         emitter.next(textField.getText());
     *     };
     *     // without cleanup support:
     *
     *     button.addActionListener(al);
     *
     *     // with cleanup support:
     *
     *     button.addActionListener(al);
     *     emitter.onDispose(() -> {
     *         button.removeListener(al);
     *     });
     * }, FluxSink.OverflowStrategy.LATEST);
     * </code></pre>
     *
     * @param <T> The type of values in the sequence
     * @param backpressure the backpressure mode, see {@link OverflowStrategy} for the
     * available backpressure modes
     * @param emitter Consume the {@link FluxSink} provided per-subscriber by Reactor to generate signals.
     * @return a {@link Flux}
     */
    public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
        return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL));
    }

    /**
     * Decorate the specified {@link Publisher} with the {@link Flux} API.
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/from.png" alt="">
     * <p>
     * @param source the source to decorate
     * @param <T> The type of values in both source and output sequences
     *
     * @return a new {@link Flux}
     */
    public static <T> Flux<T> from(Publisher<? extends T> source) {
        if (source instanceof Flux) {
            @SuppressWarnings("unchecked")
            Flux<T> casted = (Flux<T>) source;
            return casted;
        }

        if (source instanceof Fuseable.ScalarCallable) {
            try {
                @SuppressWarnings("unchecked") T t =
                        ((Fuseable.ScalarCallable<T>) source).call();
                if (t != null) {
                    return just(t);
                }
                return empty();
            }
            catch (Exception e) {
                return error(e);
            }
        }
        return wrap(source);
    }

    /**
     * Programmatically create a {@link Flux} by generating signals one-by-one via a
     * consumer callback and some state, with a final cleanup callback. The
     * {@code stateSupplier} may return {@literal null} but your cleanup {@code stateConsumer}
     * will need to handle the null case.
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/generate.png" alt="">
     * <p>
     *
     * @param <T> the value type emitted
     * @param <S> the per-subscriber custom state type
     * @param stateSupplier called for each incoming Subscriber to provide the initial state for the generator bifunction
     * @param generator Consume the {@link SynchronousSink} provided per-subscriber by Reactor
     * as well as the current state to generate a <strong>single</strong> signal on each pass
     * and return a (new) state.
     * @param stateConsumer called after the generator has terminated or the downstream cancelled, receiving the last
     * state to be handled (i.e., release resources or do other cleanup).
     *
     * @return a {@link Flux}
     */
    public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer) {
        return onAssembly(new FluxGenerate<>(stateSupplier, generator, stateConsumer));
    }
}

实例

Mono

    @Test
    public void testMonoBasic(){
        Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
        Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
        Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
    }

Mono ,是指最多只能触发(emit) (事件)一次。它对应于 RxJava 库的 Single 和 Maybe 类型或者是java的Optional。因此一个异步任务,如果只是想要在完成时给出完成信号,就可以使用 Mono<Void>。

调用 Flux<T>的single()将返回一个 Mono<T>,而连接两个 monos一起使用 concatWith 将产生一个 Flux。

Flux

    @Test
    public void testBasic(){
        Flux.just("Hello", "World").subscribe(System.out::println);
        Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
        Flux.empty().subscribe(System.out::println);
        Flux.range(1, 10).subscribe(System.out::println);
        Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
    }

Flux 相当于一个 RxJava Observable,能够发出 0~N 个数据项,然后(可选地)completing 或 erroring。处理多个数据项作为stream。

小结

Mono和Flux都是实现Publisher接口的抽象类,一个相当于Optional,一个相当于有0..N的stream。两个都是spring 5 reactive编程的重要基础概念。

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,980评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,422评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,130评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,553评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,408评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,326评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,720评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,373评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,678评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,722评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,486评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,335评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,738评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,283评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,692评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,893评论 2 335

推荐阅读更多精彩内容