Spring 5.x 中的reactor-core包
下一篇:Spring Reactor map与flatMap操作符 详解
第三篇:Spring Reactor parallel并发与线程切换 实战
Reactor是什么:基于Reactor-Stream规范实现反应式编程范例,指的是一种面向数据流并传播事件的异步编程范式;当然也可以是同步;
jar包引入:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.0.RELEASE</version>
</dependency>
Reactor反应式编程,是一种流水线式的开发,自从和他亲密接触之后,感觉已经离不开他;
reactor-core包提供的核心类主要是:
- Mono : 可以产生和处理0或1个事件;
- Flux:可以产生和处理0,,,N个事件;
Flux(Mono)主要操作符:
整理了一些,但远不止这些;
1、创建类操作符
create操作符
方法范例:我们创建一个可以发射任意事件类型的被观察者,T是继承Object;
Flux.create(new Consumer<FluxSink<T>>(){})
// 被观察者
Flux<Integer> flux = Flux.create(new Consumer<FluxSink<Integer>>() {
@Override
public void accept(FluxSink<Integer> integerFluxSink) {
integerFluxSink.next(1);
integerFluxSink.next(2);
integerFluxSink.complete();
integerFluxSink.next(3);
}
});
// 观察者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
System.out.println("消费"+integer);
}
@Override
public void onError(Throwable t) {
System.out.println("错误");
}
@Override
public void onComplete() {
System.out.println("完成");
}
};
flux.subscribe(subscriber);
为了方便理解,把被观察者比作上游流水线(也可以比作生产者);观察者比作下游流水线(消费者);
- integerFluxSink 就是我们的发射器,可以发生N个事件,通过next() 完成;
- 当integerFluxSink发射complete()或error()事件之后,integerFluxSink还可以继续发生next()事件,但是下游流水线接受complete/error事件之后,停止接受任何事件;
- 下游接受到事件之后,进入onNext()方法;完成进入onComplete()方法,错误onError()方法
- 上游可以不发射complete()和error() 方法,管道处于监控状态,不会执行下游onComplete()方法;
- 关于onSubscribe方法,后面说到背压时会讲到;
just操作符
Flux.just("1", "2","3").subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
System.out.println("s");
}
@Override
public void onError(Throwable t) {
System.out.println("错误");
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
just 相当于一个特殊数组,这个特殊的数组可以同时存放,不同类型的数据,我这里放了相同类型的String;
just和create区别:
create:根据自己的需求手动创建事件并发送;
just: 将已有的数据交给Flux发送,发送完成之后会自定发送complete事件;
From 系列操作符
- fromStream(Stream<? extends T> s)
- fromIterable(Iterable<? extends T> it)
- fromArray(T[] array)
- from(Publisher<? extends T> source)
使用说明
- fromStream: 可以放入一个jdk8中的Stream流;
- fromIterable:放入一个集合,所有实现了Collection集合接口的类都可以放入,如List,Set集合;
- fromArray:放入一个数组;他和just区别是,just可以同时放如各种数据类型的数据;
- from: 类似发射器,更像一个回调函数,可以发生N个事件;
- 他们的使用和create 与just类似;
Flux.from(new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
s.onNext(1);
s.onNext(1);
s.onComplete();
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(Integer integer) {
System.out.println("消费:"+integer);
}
@Override
public void onError(Throwable t) {
System.out.println("错误");
}
@Override
public void onComplete() {
System.out.println("完成");
}
});
from和create的区别
- from :通过直接引入下游管道来传输数据,一旦上游流水线与下游流水线发生订阅,就把事件之间传递给下游;
- create 是一个发射器,发送的数据,需要下游发起请求并订阅,上游才会把事件传递给下游;
- 在onSubscribe方法中,create操作符需要调用s.request(Integer.MAX_VALUE); from才不需要这个调用;
empty()/ error()/ never()
- Flux.empty() : 发送一个complete完成事件;
- Flux.error(Throwable error) 发射一个Throwable错误事件
- Flux.never() 不发送事件
时间类操作符
interval 操作符
CountDownLatch latch = new CountDownLatch(1);
Flux.interval(Duration.ofSeconds(2),java.time.Duration.ofSeconds(3)).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) {
System.out.println(aLong+"=="+Thread.currentThread().getName());
}
});
latch.await();
输出内容:
0==parallel-1
1==parallel-1
、、、
- Interval:每隔相应的时间发送一次,
- 第一个参数:第一次执行的延迟时间
- 第二个参数:每隔多少秒发送一次事件,发送的内容是Long类型整数,从0开始;
- interval 默认是在子线程执行事件发送的;
timeout 操作符
Flux.create(new Consumer<FluxSink<Integer>>() {
@Override
public void accept(FluxSink<Integer> fluxSink) {
fluxSink.next(1);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
fluxSink.next(3);
}
}).timeout(Duration.ofSeconds(2), new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> s) {
s.onNext(2);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println("内容:"+integer);
}
});
输出内容:
内容:1
内容:2
- timeout :在超时之后做相应的时间
- 第一个参数:设置超时时间,我这里设置了2秒,2秒上游没有把事件发送过来,进入timeout,发送一个2;
- 第二个参数:超时之后会进入的操作
- 上游一旦超时,就会丢失下面所有的事件,不会继续传给下游,之后走timeout的逻辑;
defer 操作符
Flux<Integer> defer = Flux.defer(new Supplier<Publisher<Integer>>() {
@Override
public Publisher<Integer> get() {
return Flux.just(new Random().nextInt(10));
}
});
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(Integer integer) {
System.out.println("=="+integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
defer.subscribe(subscriber);
- defer : 延迟初始化被观察者,直到上游流水线与下游流水线发生订阅后,才会创建被观察者。
- 在Spring Gateway中有普遍使用;
delay系列操作符
CountDownLatch latch = new CountDownLatch(1);
Flux.just("1","2","3").delayElements(Duration.ofSeconds(2)).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(Thread.currentThread().getName()+"=="+s);
}
});
latch.await();
输出结果:
parallel-1==1
parallel-2==2
parallel-3==3
- delayElements: 每一个原始事件都会延迟2秒,才发送,知道发送完成
- 默认在子线程中完整
根据时间定时发送、超时、延迟发送的操作符大致是这些了;类似的还有很多实用基本相似;