网上有很多大佬已经写过了这个响应式流的原理,这里就不在说明了。
网上查了很多大多数都是基于jdk9实现,但公司还在用jdk8,所以就自己研究了一下,分享给大家,共同学习。
官网:http://www.reactive-streams.org
github:https://github.com/reactive-streams/reactive-streams-jvm/tree/v1.0.4
这个包之定义了响应式流接口,并在jdk9时加入到了jdk中,在jdk9中对接口进行了代码的实现,所以在jdk9之后可以直接使用(封装的真好,使用很方便)
,但在jdk8及之前的版本想要使用响应式流就要引入这个包并自己实现接口。
这里写了一个小demo用于练习理解响应式流
导入reactive-streams这个包
maven
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
</dependency>
jar包提供了4个接口和1个jdk9及以上的适配器
class org.reactivestreams.FlowAdapters
interface org.reactivestreams.Processor
interface org.reactivestreams.Publisher
interface org.reactivestreams.Subscriber
interface org.reactivestreams.Subscription
1、创建用于传输的类
public class User {
private Integer id;
private String name;
// 有参构造
public User(Integer id, String name){
this.id = id;
this.name = name;
}
// 省略 get set 方法
}
2、创建订阅中心 UserSubscription
public class UserSubscription implements Subscription {
public static final ConcurrentLinkedQueue<User> inbound = new ConcurrentLinkedQueue<>();
private Subscriber subscriber;
public UserSubscription(Subscriber subscriber){
this.subscriber = subscriber;
}
@Override
public void request(long l) {
new Thread(() -> {
User user = inbound.poll();
subscriber.onNext(user);
}).start();
}
@Override
public void cancel() {
System.out.println("【订阅者】取消订阅");
}
}
3、创建发布者 UserPublisher
public class UserPublisher implements Publisher<User> {
@Override
public void subscribe(Subscriber<? super User> subscriber) {
subscriber.onSubscribe(new UserSubscription(subscriber));
}
public void submit(User user){
UserSubscription.inbound.offer(user);
System.out.println("【发布者】发布消息:{id:"+user.getId()+", name:"+user.getName()+"}");
}
}
4、创建订阅者 UserSubscriber
public class UserSubscriber implements Subscriber<User> {
private Subscription subscription;
/**
* 初始化Subscription,并且表示可以开始接受订阅数据了
* @param subscription
*/
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.println("【订阅者】订阅成功");
// 订阅者必须发送需求信号,才能接受onNext信号
subscription.request(1); //这里只接受一个数据元素
}
/**
* 接受下一项订阅数据的回调方法
* @param user
*/
@Override
public void onNext(User user) {
boolean flag = true;
if(user != null){
while(flag){
System.out.println("【订阅者】接受数据:{id:"+user.getId()+", name:"+user.getName()+"}");
flag = false;
}
}
this.subscription.request(1);
}
/**
* 在Publisher或Subscriber遇到不可恢复的错误时调用此方法,Subscriber不在接受订阅消息
*
* 不得调用Subscription或Publisher上的任何方法
* 目的时防止在处理完成信号期间发生Subscription或Publisher之间的周期竞争条件
* 如果收到此信号必须考虑取消订阅
* 目的只确保Subscriber遵从Subscription信号状态。在收到onError信号后,订阅就失效
* @param throwable
*/
@Override
public void onError(Throwable throwable) {
System.out.println("【订阅者】出现异常");
this.subscription.cancel();
}
/**
* 当接受完所有订阅数据,并且发布者已经关闭后会回调这个方法
*
* 不得调用Subscription或Publisher上的任何方法
* 目的时防止在处理完成信号期间发生Subscription或Publisher之间的周期竞争条件
* 如果收到此信号必须考虑取消订阅
* 目的只确保Subscriber遵从Subscription信号状态。在收到onComplete信号后,订阅就失效
*/
@Override
public void onComplete() {
System.out.println("【订阅者】数据接受完成");
this.subscription.cancel();
}
}
5、main方法
public class UserMain {
public static void main(String[] args) throws Exception {
// 1、创建发布者
UserPublisher publisher = new UserPublisher();
// 2、创建订阅者
UserSubscriber subscriber = new UserSubscriber();
// 3、发布者和订阅者建立订阅关系
publisher.subscribe(subscriber);
// 4、发布者发布消息
for(int i = 0; i < 10; i++){
publisher.submit(new User(i, "wpixel"));
Thread.sleep(1000);
}
}
}
6、Console
【订阅者】订阅成功
【发布者】发布消息:{id:0, name:wpixel}
【订阅者】接受数据:{id:0, name:wpixel}
【订阅者】接受数据:{id:1, name:wpixel}
【发布者】发布消息:{id:1, name:wpixel}
【发布者】发布消息:{id:2, name:wpixel}
【订阅者】接受数据:{id:2, name:wpixel}
【发布者】发布消息:{id:3, name:wpixel}
【订阅者】接受数据:{id:3, name:wpixel}
【发布者】发布消息:{id:4, name:wpixel}
【订阅者】接受数据:{id:4, name:wpixel}
【发布者】发布消息:{id:5, name:wpixel}
【订阅者】接受数据:{id:5, name:wpixel}
【发布者】发布消息:{id:6, name:wpixel}
【订阅者】接受数据:{id:6, name:wpixel}
【发布者】发布消息:{id:7, name:wpixel}
【订阅者】接受数据:{id:7, name:wpixel}
【发布者】发布消息:{id:8, name:wpixel}
【订阅者】接受数据:{id:8, name:wpixel}
【发布者】发布消息:{id:9, name:wpixel}
【订阅者】接受数据:{id:9, name:wpixel}
作者是一名自由程序员,住在杭州,喜欢音乐、小说、旅行、以及编程。
P.S. 如果您喜欢这篇文章并且希望学习编程技术的话,请关注一下