Spring Reactor 操作符详解

Spring 5.x 中的reactor-core包

下一篇:Spring Reactor map与flatMap操作符 详解
第三篇:Spring Reactor parallel并发与线程切换 实战
Reactor是什么:基于Reactor-Stream规范实现反应式编程范例,指的是一种面向数据流并传播事件的异步编程范式;当然也可以是同步;

jar包引入:

<dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.3.0.RELEASE</version>
</dependency>

Reactor反应式编程,是一种流水线式的开发,自从和他亲密接触之后,感觉已经离不开他;

reactor-core包提供的核心类主要是:

  • Mono : 可以产生和处理0或1个事件;
  • Flux:可以产生和处理0,,,N个事件;

Flux(Mono)主要操作符:

整理了一些,但远不止这些;


图片描述

1、创建类操作符

create操作符


方法范例:我们创建一个可以发射任意事件类型的被观察者,T是继承Object;
Flux.create(new Consumer<FluxSink<T>>(){})
// 被观察者
Flux<Integer> flux = Flux.create(new Consumer<FluxSink<Integer>>() {
            @Override
            public void accept(FluxSink<Integer> integerFluxSink) {
                integerFluxSink.next(1);
                integerFluxSink.next(2);
                integerFluxSink.complete();
                integerFluxSink.next(3);
            }
        });
   // 观察者
   Subscriber<Integer> subscriber = new Subscriber<Integer>() {
       @Override
       public void onSubscribe(Subscription s) {
           s.request(Integer.MAX_VALUE);
       }
       @Override
       public void onNext(Integer integer) {
           System.out.println("消费"+integer);
       }
       @Override
       public void onError(Throwable t) {
           System.out.println("错误");
       }
       @Override
       public void onComplete() {
           System.out.println("完成");
       }
   };
   flux.subscribe(subscriber);

为了方便理解,把被观察者比作上游流水线(也可以比作生产者);观察者比作下游流水线(消费者);

图片描述
  • integerFluxSink 就是我们的发射器,可以发生N个事件,通过next() 完成;
  • 当integerFluxSink发射complete()或error()事件之后,integerFluxSink还可以继续发生next()事件,但是下游流水线接受complete/error事件之后,停止接受任何事件;
  • 下游接受到事件之后,进入onNext()方法;完成进入onComplete()方法,错误onError()方法
  • 上游可以不发射complete()和error() 方法,管道处于监控状态,不会执行下游onComplete()方法;
  • 关于onSubscribe方法,后面说到背压时会讲到;

just操作符

Flux.just("1", "2","3").subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(Integer.MAX_VALUE);
            }
            @Override
            public void onNext(String s) {
                System.out.println("s");
            }
            @Override
            public void onError(Throwable t) {
                System.out.println("错误");
            }
            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        });

just 相当于一个特殊数组,这个特殊的数组可以同时存放,不同类型的数据,我这里放了相同类型的String;

just和create区别:
create:根据自己的需求手动创建事件并发送;
just: 将已有的数据交给Flux发送,发送完成之后会自定发送complete事件;

From 系列操作符

  1. fromStream(Stream<? extends T> s)
  2. fromIterable(Iterable<? extends T> it)
  3. fromArray(T[] array)
  4. from(Publisher<? extends T> source)

使用说明

  1. fromStream: 可以放入一个jdk8中的Stream流;
  2. fromIterable:放入一个集合,所有实现了Collection集合接口的类都可以放入,如List,Set集合;
  3. fromArray:放入一个数组;他和just区别是,just可以同时放如各种数据类型的数据;
  4. from: 类似发射器,更像一个回调函数,可以发生N个事件;
  5. 他们的使用和create 与just类似;
Flux.from(new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> s) {
                s.onNext(1);
                s.onNext(1);
                s.onComplete();
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {

            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("消费:"+integer);
            }
            @Override
            public void onError(Throwable t) {
                System.out.println("错误");
            }
            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        });

from和create的区别

  • from :通过直接引入下游管道来传输数据,一旦上游流水线与下游流水线发生订阅,就把事件之间传递给下游;
  • create 是一个发射器,发送的数据,需要下游发起请求并订阅,上游才会把事件传递给下游;
  • 在onSubscribe方法中,create操作符需要调用s.request(Integer.MAX_VALUE); from才不需要这个调用;

empty()/ error()/ never()

  • Flux.empty() : 发送一个complete完成事件;
  • Flux.error(Throwable error) 发射一个Throwable错误事件
  • Flux.never() 不发送事件

时间类操作符

interval 操作符

CountDownLatch latch = new CountDownLatch(1);
 Flux.interval(Duration.ofSeconds(2),java.time.Duration.ofSeconds(3)).subscribe(new Consumer<Long>() {
      @Override
      public void accept(Long aLong) {
          System.out.println(aLong+"=="+Thread.currentThread().getName());
      }
  });
  latch.await();
  输出内容:
  0==parallel-1
  1==parallel-1
  、、、
  • Interval:每隔相应的时间发送一次,
    • 第一个参数:第一次执行的延迟时间
    • 第二个参数:每隔多少秒发送一次事件,发送的内容是Long类型整数,从0开始;
  • interval 默认是在子线程执行事件发送的;

timeout 操作符

Flux.create(new Consumer<FluxSink<Integer>>() {
       @Override
       public void accept(FluxSink<Integer> fluxSink) {
           fluxSink.next(1);
           try {
               TimeUnit.SECONDS.sleep(3);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           fluxSink.next(3);
       }
   }).timeout(Duration.ofSeconds(2), new Publisher<Integer>() {
       @Override
       public void subscribe(Subscriber<? super Integer> s) {
           s.onNext(2);
       }
   }).subscribe(new Consumer<Integer>() {
       @Override
       public void accept(Integer integer) {
           System.out.println("内容:"+integer);
       }
   });
   输出内容:
   内容:1
   内容:2

  • timeout :在超时之后做相应的时间
    • 第一个参数:设置超时时间,我这里设置了2秒,2秒上游没有把事件发送过来,进入timeout,发送一个2;
    • 第二个参数:超时之后会进入的操作
  • 上游一旦超时,就会丢失下面所有的事件,不会继续传给下游,之后走timeout的逻辑;

defer 操作符

Flux<Integer> defer = Flux.defer(new Supplier<Publisher<Integer>>() {
            @Override
            public Publisher<Integer> get() {
                return Flux.just(new Random().nextInt(10));
            }
        });
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("=="+integer);
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        };
        defer.subscribe(subscriber);
  • defer : 延迟初始化被观察者,直到上游流水线与下游流水线发生订阅后,才会创建被观察者。
  • 在Spring Gateway中有普遍使用;

delay系列操作符

CountDownLatch latch = new CountDownLatch(1);
  Flux.just("1","2","3").delayElements(Duration.ofSeconds(2)).subscribe(new Consumer<String>() {
      @Override
      public void accept(String s) {
          System.out.println(Thread.currentThread().getName()+"=="+s);
      }
  });
  latch.await();
  输出结果:
  parallel-1==1
  parallel-2==2
  parallel-3==3
  • delayElements: 每一个原始事件都会延迟2秒,才发送,知道发送完成
  • 默认在子线程中完整

根据时间定时发送、超时、延迟发送的操作符大致是这些了;类似的还有很多实用基本相似;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容