Reactive、Reactor和webflux

开题

Reactor顾名思义核反应堆,光听名字就知道它有多强了,首先Reactor是异步非阻塞的,基于netty,而tomcat不是,一个请求一个线程(除了Servlet3.1以上),使用Reactor就是整个代码在执行链上也更清晰,做过前端的同学应该很有感悟,不管是jquery还是vue都是一级一级往下点,那种感觉真的很爽,当然java8也有类似体验。所以诞生之初,这个东西就不是为了java而生的,java是重语言,强调稳定性,直到jdk1.8也不愿意为reactor妥协。最终Spring按奈不住了,率先把reactor集成至自己函数库中,所以Spring的版本至少是Spring5,而jdk至少是1.7,(因为Spring5实现了很多关于响应式编程的东西),然后webflux坑很深,完全看上去像另一门语言(重点),所以学习成本相对陡峭,我更多从使用者方向去思考。
国内使用这个技术的公司好像没几家,除了我上家公司以外(用的也不是很好),首先我知道的有阿里,当然也只是一部分技术部分,也是我同学告诉我的,然后我在这家公司刚做技术选型时注册中心还在纠结eureka还是nacos,因为我来这之前根本不会nacos,只是听过,用eureka倒是很熟,但是看到nacos支持响应式编程,我还是很开心的,加上其他一些因素还是选定了nacos。
所以大家暂时不用担心,这个技术暂时还不会取代java命令式编程,因为兼容其他中间件还需要时间,但是也是一个警告。

命令式编程和响应式编程区别

命令式编程一行一个代码,我们很明确就能知道,下一行代码跟上一行代码关系,因为是按步骤一步一步往下走的,最终返回的那个结果是上面一行一行代码组合最终的呈现结果。

而响应式编程不一样,它不会再描述每一步我们要进行的步骤,它只描述你要构建数据将要流经的管道,当数据流经管道时,可以对它们进行某种形式的修改或者使用。这样做的好处是我们不再关注每一行代码是做什么的(想象有100行代码),只需要关注管道最终返回的结果是什么,然后依据上一个管代的结果,流到我们这个管道需要做什么。每个管道都是异步非阻塞的。

主要原因是Servlet是阻塞和多线程的,每个连接都会使用一个线程。在请求处理的时候,会在线程池中拉取一个worker线程来对请求进行处理。同时,请求线程是阻塞的,直到worker线程提示它完成为止。这也带来的后果就是阻塞式Web框架在大量请求无法有效地扩展。缓慢的worker线程所带来的延迟会使情况变得更糟,因为它将花费更长的时间才能将worker线程送回池中,准备处理另一个请求。在某些场景中,这种设计完全可以接受。事实上,这种方式也是这十年来Web应用程序的开发方式,但是时代在改变。这种方式适合以前偶尔浏览网站的人们,而现在人们会频繁消费HTTPAPI,他们会持续地和Web API交换数据。

事件轮询请求

数据库支持

  1. 2.3执行时间和普通比。
  2. Spring Data Reactive不支持 MySQL,进一步也不支持 MySQL 事务。所以用了 Reactive 原来的 spring 事务管理就不好用了。jdbc jpa 的事务是基于阻塞 IO 模型的,如果 Spring Data Reactive 没有升级 IO 模型去支持 JDBC,生产上的应用只能使用不强依赖事务的。

Reactor的主要类

在Reactor中,经常使用的类并不多,主要有以下两个:

  • Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者(Publisher)
  • Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发布者

Publisher

Mono和Flux都是Publisher,发布者起到发送流数据作用。

Subscriber

1.Subscriber,因为一次只请求一个元素会导致本身效率低下。
2.为了验证是不是一次请求一个元素,fromInter 或 range。
onComplete因为是多线程,为了防止发布者和订阅者结束后有个通知,否则会造成周期竞争。
onComplete或onError都会触发终止订阅

Subscription 和 Processor

发布者、订阅者关系流程

Backpressure

指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略,可能最终导致溢出。subscription

doOnSubscribe 和 doOnNext

  1. doOnSubscribe是事件被订阅之前(也就是事件源发起之前)会调用的方法, 它一般执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
  2. doOnNext是观察者被通知之前(也就是回调之前)会调用的方法,说白了就是最终回调之前的前一个回调方法,这个方法一般做的事件类似于观察者做的事情,只是自己不是最终的回调者。(观察者即最终回调者)

Mono和Flux

开发者应只关注Publisher ,如果开发中间件,redis、dubbo,甚至nacos, Web Flux 则会自动帮我们实现 Subscriber

Flux类的静态方法

  • just():可以指定序列中包含的全部元素。
  • fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
  • empty():创建一个不包含任何元素,只发布结束消息的序列。
  • error(Throwable error):创建一个只包含错误消息的序列。
  • range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
  • interval(Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
  • concat,类似于Mono的zip(但是不一样,这个不会返回tuple)。
  • concatWith,类似于Mono的zipWith(但是不一样,这个不会返回tuple)。
  • concatMapIterable(Arrays.asList),会在Flux.just(1,2,3)每个元素中逐个穿插指定集合元素,或对当前Flux进行数据操作(比如逐个元素加2)。
  • defer():一种懒创建方式,对比just。
  • MathFlux.sumInt(Flux.range),这是新出的一种封装,MathFlux有多种函数计算实现,可以根据不同场景选型。替换可.as()。也可替换reduce也能实现上述功能,但是reduce功能更加强大,不仅可以对数字甚至任何类型都可以。
  • generate() 方法同步和逐一的方式来产生 Flux 序列,next()方法只能最多被调用一次,不调用 complete()方法,所产生的是一个无限序列。
  • create() 跟上面类似,只是它允许有多个元素。
  • buffer(int) 和 bufferUntil(), 这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件。
  • concatWith(Flux)、onErrorResume()、onErrorReturn、doFinally 和 retry() 异常处理对比java。
  • collectList() 和 collect(Collectors.toList()) ,Mono转Flux。

Mono类的静态方法

  • zipWith():不需要上一个Mono的结果(类型可以不一样)。
  • zipWhen():需要上一个Mono的结果(类型可以不一样)。
  • zip():组装多个Mono(类型可以不一样)。
  • flatMapMany():Mono转Flux。
  • delayElement,类似于Thread.sleep,可以结合map(同步)、flatMap(异步)。

Flux和Mono共有方法

  • transform():抽出公共部分组装。
  • defer():同Flux。
  • publishOn(Schedulers) 和 subscribeOn(Schedulers),可以动态切换线程,可以结合buffer、log使用。

Schedulers 类有如下几种对上下文操作的静态方法:

  • immediate():无执行上下文,提交的Runnable将直接在原线程上执行,可以理解没有调度
  • single():可重用单线程,使用一个线程处理所有请求
  • elastic(): 没有边界的弹性线程池
  • boundedElastic():有边界弹性线程池,设置线程限制,默认为cpu核心数*10。达到上限后最多可以提交10万个任务。是阻塞线程的方法
  • parallel(): 固定线程数量的并行线程池,线程数量和cpu内核一样多

WebFlux

RouterFunction 类似 Spring Web 的 @RequestMapping 。RouterFunction 用来定义 Spring 5 应用的路由信息。RouterFunctions 助手类包含一些有用的方法,例如 route 定义路由并构建 RouterFunction 对象。RequestPredicates 包含大量有用的方法如 GET, POST, path, queryParam ,accept, headers, contentType 等等,可用来定义路由和构建 RouterFunction。每个 Route 映射到一个处理方法,当接收到 HttpRequest 请求的时候就会调用。

Mono<ServerResponse> 是在配置控制器方法中返回的,而不是controller。

RouterFunction 为应用程序提供了 DSL 风格的路由功能。此时,Spring 并不支持两种风格混合使用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,392评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,258评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,417评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,992评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,930评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,199评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,652评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,327评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,463评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,382评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,432评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,118评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,704评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,787评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,999评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,476评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,057评论 2 341