@TOC
Version 5.2.0.RELEASE
本文档是介绍响应式技术栈,它基于响应式流API构建,运行于Netty、Undertow、Servlet 3.1+等非阻塞服务容器之上。之后的章节分别介绍了Spring WebFlux框架、响应式WebClient、测试支持及响应式库。有关Servlet技术栈的web应用文档,请转到Servlet Web技术栈。
1 Spring WebFlux
Spring中原本的的web框架,Spring Web MVC,是为Servlet API和Servlet容器构建的。而响应式web框架WebFlux是在5.0版本添加的。它是完全非阻塞的,且支持响应式流背压,可以运行在Netty、Undertow和Servlet 3.1+等容器上。
这两个框架的模块名模块名spring-webmvc和spring-webflux是它们各自的写照,在Spring框架中共存。应用程序即可择其一用之,在某些情况下亦可同时用之。例如使用基于响应式WebClient
的Spring MVC控制器。
1.1 概述
为什么要开发Spring Webflux呢?
一部分原因是需要通过非阻塞web技术栈使用更少量的线程及硬件资源处理并发。Servlet 3.1的确提供了非租塞I/O相关API,然而,这些API和原有的Servlet API,如同步的(Filter
、Servlet
)或阻塞的(getParameter
、getPart
)相去甚远。这就是设计新的通用非阻塞运行时基础API的动机。因为像Netty这样的服务器在异步、非阻塞方面做得非常好,所以通用性、基础性非常重要。
另一部分原因是函数式编程。就像Java 5注解的引入带来的的增强(基于注解的REST控制器或单元测试),Java 8引入的lambda表达式使Java拥有了使用函数式API的机会。它有助于构建非阻塞应用和声明式组合异步逻辑的持续式风格API(流行于CompletableFuture
和ReactiveX)的使用。从编程模型来看,Java 8使Spring WebFlux结合注解控制器提供函数式web终端成为可能。
1.1.1 “响应式”的定义
我们一直在谈论“非阻塞”和“函数式”,但响应式到底是什么意思呢?
术语“响应式”是指围绕对变化的响应而构建的编程模型,例如,网络组件响应I/O事件、UI控件响应鼠标事件等。这种场景是非阻塞的,因为在操作完成或数据可用时才做出反应,在这之前程序不会因等待而阻塞。
Spring团队在“响应式”上提供的另一重要机制是非阻塞背压。在同步的、命令式的代码中,强制调用方等待的阻塞构成了背压的自然形式。而在非阻塞代码中,为了不让速度快的生产者冲垮目标,控制事件的流速变得重要起来。
响应式流是一个小型标准(亦被Java 9采纳),定义了背压场景下异步组件间的交互。例如,数据仓库(作为发布者)产生数据,HTTP服务器(作为订阅者)读取并写入响应。响应式流的主要目的就是使订阅者可以控制发布者生产数据的快慢。
通常疑问:如果发布者不能慢下来怎么办?
响应式流只建立机制和边界。如果发布者不能慢下来,则考虑使用缓冲区、抛弃或让操作失败。
1.1.2 响应式API
响应式流在互操作性上扮演了重要角色。它关注的是库的基础设施组件,因为太底层了,所以在应用API上用处不大。应用程序需要丰富的高级函数式API组合异步逻辑,和Java 8 的Stream
API相似,但不仅仅是集合操作。这就轮到响应式库登场了。
Spring WebFlux选择的是Reactor响应式库。它的API提供了Mono类型和Flux类型,可以使用一系列基于ReactiveX的操作符分别操作0..1(Mono
)和0..N(Flux
)的数据序列。Reactor是一个响应式流库,故所有操作都支持非阻塞背压。Reactor重点关注服务器端Java。在开发时就注重和Spring的协作。
Reactor是WebFlux的核心依赖,但WebFlux也可以和其他响应式流库合作。WebFlux API的通用模式是:接收普通的Publisher
作为输入,在内部将其转为Reactor类型并使用,返回Flux
或Mono
作为输出。所以可以传入任意类型的Publisher
作为输入,并在输出上执行操作。但需要将输出和所选响应式库进行适配。只要可行(如使用注解的控制器),WebFlux就会透明地适配RxJava或其他响应式库。参考响应式库获取更多详细信息。
作为响应式API的扩展,WebFlux同时支持在Kotlin中使用协程API,它可以让编程风格更加命令式。后文将提供使用协程API的Kotlin代码示例。
1.1.3 编程模型
spring-web
模块包含了Spring WebFlux的响应式基础部分,包括HTTP抽象、响应式流对所支持服务器的适配器、编解码器以及类似Servlet API但非阻塞的核心Web处理器API。
在这之上,Spring WebFlux提供了两种编程模型:
-
使用注解的控制器 - 顺承Spring MVC,同样基于
spring-web
模块的注解。Spring MVC和WebFlux都支持响应式(Reactor、RxJava)返回类型,彼此分辨并不容易。值得注意的一点不同是WebFlux还支持响应式@RequestBody
参数。 - 函数式终端 - 基于lambda、轻量级、函数式编程模型。可以将其视为一个小型代码库或一系列工具方法,应用程序用它处理路由和请求。函数式终端和使用注解的控制器间最大的区别在于,前者负责从头到尾的请求处理,后者通过注解声明意图并回调。
1.1.4 适用性
Spring MVC 还是 WebFlux?
这是一个自然的提问,但也包含了不必要的对立。实际上,两者可以同时工作,拓宽了可选项的范围。两者在设计上和对方是持续统一的,它们可以并肩作战,双方都可以享受到对方带来的好处。下图展示了两者的关系,哪些是它们共有的,哪些是各自独有的:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PPZVeUoe-1571103878397)(https://docs.spring.io/spring/docs/5.2.0.RELEASE/spring-framework-reference/images/spring-mvc-and-webflux-venn.png)]
我们建议您考虑如下要素:
- 如果应用使用Spring MVC工作的很好,那就没必要更换。命令式程序代码在编写、理解和调试上都是最简单的。而且,因为历史原因,应用中选用的绝大多数类库都是阻塞的。
- 如果您已经考察过非阻塞web技术栈,Spring WebFlux提供相同的执行模型优势,而且还提供了多种服务器选项(Netty、Tomcat、Jetty、Undertow、Servlet 3.1+容器)、多种编程模型(使用注解的控制器和函数式web终端)及多种响应式库选项(Reactor、RxJava等)。
- 如果对使用Kotlin或Java 8 lambda表达式的轻量级函数式web框架感兴趣,可以使用Spring WebFlux的函数式web终端。对小型应用或没有复杂依赖的微服务也是个不错的选择,可以从更好的透明度和控制中受益。
- 在微服务架构中,Spring MVC和Spring WebFlux控制器或Spring WebFlux函数式终端可以混合使用。在框架间使用相同的注解编程模型不仅使知识的复用更加简单,也是用正确的工具做正确的事。
- 简单地评估应用的一个方法是检查其依赖。如果有阻塞持久化API(JPA、JDBC)或阻塞网络API的使用,Spring MVC至少是通用架构的最佳选择。使用Reactor和RxJava在不同的线程上处理阻塞式调用在技术上是可行的,但不能充分发挥非阻塞式web技术栈的作用。
- 如果Spring MVC应用中存在远程服务调用,可以试一试响应式
WebClient
。Spring MVC控制器方法可以直接返回响应式类型(Reactor、RxJava或其他)。调用的延迟越长,或调用间的依赖性越强,优势就越大。Spring MVC也可以调用其他响应式组件。 - 如果团队很大,就要考虑切换至非阻塞、函数式、声明式编程陡峭的学习曲线。部分切换的可行起步方案是使用响应式
WebClient
。然后,一点点开始并权衡其优势。我们期望对多数程序来说,切换是不必要的。如果不确定要寻找什么好处,可以通过学习非阻塞I/O的工作方式(例如Node.js的单线程并发)及其影响开始。
1.1.5 服务器
Spring WebFlux支持Tomcat、Jetty、Servlet 3.1+及Netty和Undertow等非Servlet运行时。它们都会适配底层通用API,从而在服务器间支持高级编程模型。
Spring WebFlux没有内建服务器启动和关闭的支持。但是通过Spring配置和WebFlux基础设施可以通过数行代码很简单地在应用中嵌入并启动服务器。
Spring Boot有一个WebFlux起步器,会自动执行这些步骤。默认情况下,起步器使用Netty,但可以通过修改Maven或Gradle依赖轻松切换至Tomcat、Jetty或Undertow。Spring Boot默认选择Netty,是因为它广泛用于异步、非阻塞领域,而且可以使客户端和服务器共享资源。
Tomcat和Jetty既可以配合Spring MVC,也可以配合Spring WebFlux使用。但要注意,使用的方式是完全不同的。Spring MVC依赖Servlet阻塞I/O,如有需要应用直接使用Servlet API。Spring WebFlux依赖Servlet 3.1非阻塞I/O且Servlet API基于一个底层适配器提供,不会直接暴露。
对Undertow来说,Spring WebFlux直接使用Undertow API而不用Servlet API。
1.1.6 性能 vs 规模
性能具有多种特征和意义。响应式和非阻塞通常不能让应用跑的更快,但在在某些场景下(如使用WebClient
并发执行远程调用)可以。总的来说,以非阻塞方式执行操作需要的工作更多,且会轻微增加处理时间。
响应式和非阻塞最可预见的好处是其可以使用较少固定数量的线程和少量内存规模的能力。因为其硬件规模更加可预测,所以应用程序在欠载时更有弹性。不过,要见识到这些好处需要一定的潜伏期(包括一系列缓慢且不可预测的网络I/O)。这才是响应式技术栈秀肌肉的场合,其改变会极具戏剧性。
1.1.7 并发模型
Spring MVC和Spring WebFlux都支持使用注解的控制器,但两者的关键不同在于并发模型以及默认对阻塞和线程的假定。
Spring MVC(以及所有Servlet应用)假定应用会阻塞当前线程(如远程调用),由此,Servlet容器需要使用一个巨大的线程池以应对潜在的请求处理阻塞。
Spring WebFlux(以及所有非阻塞服务器)假定应用不会阻塞,因此,非阻塞服务器使用小规模固定大小的线程池(事件循环作业器)处理请求。
“扩展性”和“少量线程”听起来是矛盾的,但永不阻塞当前线程(其依赖回调),就不用额外线程承载阻塞调用,所以也就不需要额外的线程。
调用阻塞式API
如果确实需要使用阻塞库怎么办?Reactor和RxJava都提供了publishOn
操作符用于在额外线程上持续处理。也就是说兼容很简单。然而还是要记住,阻塞式API和并发模型并不能良好契合。
可变状态
在Reactor和RxJava中,逻辑是通过操作符声明的,之后在运行时的不同阶段,会组织一条响应式管线顺序处理数据。这样做的关键优点是将应用从保护可变状态中解放了出来,因为管线中的应用代码永不会并发执行。
线程模型
在使用了Spring WebFlux的服务器上,线程是什么样的呢?
- 在“普通的”Spring WebFlux服务器上(例如,没有数据访问也没有其他可选依赖),服务器只使用一条线程加上用于处理请求的其他若干个线程(通常等于CPU的核心数量)。不过Servlet容器可能会启动更多个线程(例如,Tomcat会启动10个)用于同时支持Servlet(阻塞)I/O及Servlet 3.1(非阻塞)I/O的使用。
- 响应式
WebClient
的操作是事件循环风格的。一簇小型固定数量的处理线程与之关联(例如Reactor Netty连接器的reactor-http-nio-
)。不过,如果在客户端和服务端同时使用Reactor Netty,默认情况下两者会共享事件循环资源。 - Reactor和RxJava提供了名为“调度器(Scheduler)”的线程池抽象,配合
publishOn
操作符使用,将处理切换到另一个线程池上。调度器的名字声明了其并发策略,例如,“parallel(CPU在限定个数的线程上密集工作)”或“elastic(使用大量线程的I/O密集工作)”。如果看到这些线程,那就意味着某些代码正在使用Scheduler
策略的专用线程池。 - 数据访问库和其他第三方依赖同样可以创建和使用它们自己的线程。
配置
Spring框架并不提供服务器的启动和停止支持。要配置服务器的线程模型,要么使用服务器专属的配置API,要么使用Spring Boot,设置Spring Boot关于不同服务器的不同配置选项。WebClient
可以直接配置。其他的库,则需要分别参考对应文档。
1.2 响应式核心
spring-web
模块包含对响应式web应用的基础支持如下:
- 服务器请求处理有两个级别的支持。
- HttpHandler - 基于非阻塞I/O的HTTP请求处理及响应式流背压的基础约定,以及Reactor Netty、Undertow、Tomcat、Jetty和所有Servlet 3.1+容器的适配器。
-
WebHandler
API - 稍高级,请求处理的多用途web API,在此之上构建了使用注解的控制器和函数式终端等编程模型。
- 对客户端来说,基础的
ClientHttpConnector
约定了通过Reactor Netty适配器和响应式Jetty HttpClient适配器发出基于非阻塞I/O的HTTP请求及响应式流背压。应用中使用的高级WebClient基于这些基础约定构建。 - 不论在客户端还是服务端,都用编解码器来序列化和反序列化HTTP请求和响应的内容。
1.2.1 HttpHandler
HttpHandler是通过单个方法处理请求和响应的基础约定。它被有意地极简化,其主要也是唯一的目的是成为不同HTTP服务器API的最小抽象。
下文表格描述了它所支持的服务器API:
服务器名 | 所用服务器API | 支持的响应式流 |
---|---|---|
Netty | Netty API | Reactor Netty |
Undertow | Undertow API | spring-web:Undertow-响应式流桥接 |
Tomcat | Servlet 3.1非阻塞I/O;Tomcat读取和写入ByteBuffer和byte[]的API | spring-web:Servlet 3.1非阻塞I/O-响应式流桥接 |
Jetty | Servlet 3.1非阻塞I/O;Jetty读取和写入ByteBuffer和byte[]的API | spring-web:Servlet 3.1非阻塞I/O-响应式流桥接 |
Servlet 3.1容器 | Servlet 3.1非阻塞I/O | spring-web:Servlet 3.1非阻塞I/O-响应式流桥接 |
下文表格描述了服务的依赖(同时参见支持的版本):
服务器名 | 组织ID | 包名 |
---|---|---|
Reactor Netty | io.projectreactor.netty | reactor-netty |
Undertow | io.undertow | undertow-core |
Tomcat | org.apache.tomcat.embed | tomcat-embed-core |
Jetty | org.eclipse.jetty | jetty-server, jetty-servlet |
下面的代码展示了HttpHandler
和每种服务器API的配合使用:
Reactor Netty
Java:
HttpHandler handler = ...
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer.create().host(host).port(port).handle(adapter).bind().block();
Kotlin:
val handler: HttpHandler = ...
val adapter = ReactorHttpHandlerAdapter(handler)
HttpServer.create().host(host).port(port).handle(adapter).bind().block()
Undertow
Java:
HttpHandler handler = ...
UndertowHttpHandlerAdapter adapter = new UndertowHttpHandlerAdapter(handler);
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build();
server.start();
Kotlin:
val handler: HttpHandler = ...
val adapter = UndertowHttpHandlerAdapter(handler)
val server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build()
server.start()
Tomcat
Java:
HttpHandler handler = ...
Servlet servlet = new TomcatHttpHandlerAdapter(handler);
Tomcat server = new Tomcat();
File base = new File(System.getProperty("java.io.tmpdir"));
Context rootContext = server.addContext("", base.getAbsolutePath());
Tomcat.addServlet(rootContext, "main", servlet);
rootContext.addServletMappingDecoded("/", "main");
server.setHost(host);
server.setPort(port);
server.start();
Kotlin:
val handler: HttpHandler = ...
val servlet = TomcatHttpHandlerAdapter(handler)
val server = Tomcat()
val base = File(System.getProperty("java.io.tmpdir"))
val rootContext = server.addContext("", base.absolutePath)
Tomcat.addServlet(rootContext, "main", servlet)
rootContext.addServletMappingDecoded("/", "main")
server.host = host
server.setPort(port)
server.start()
Jetty
Java:
HttpHandler handler = ...
Servlet servlet = new JettyHttpHandlerAdapter(handler);
Server server = new Server();
ServletContextHandler contextHandler = new ServletContextHandler(server, "");
contextHandler.addServlet(new ServletHolder(servlet), "/");
contextHandler.start();
ServerConnector connector = new ServerConnector(server);
connector.setHost(host);
connector.setPort(port);
server.addConnector(connector);
server.start();
Kotlin:
val handler: HttpHandler = ...
val servlet = JettyHttpHandlerAdapter(handler)
val server = Server()
val contextHandler = ServletContextHandler(server, "")
contextHandler.addServlet(ServletHolder(servlet), "/")
contextHandler.start();
val connector = ServerConnector(server)
connector.host = host
connector.port = port
server.addConnector(connector)
server.start()
Servlet 3.1+ 容器
要想以WAR的形式在任意Servlet 3.1+ 容器上部署,需要在WAR中继承并引入AbstractReactiveWebInitializer
。该类使用ServletHttpHandlerAdapter
封装了HttpHandler
并将其注册为一个Servlet
。
1.2.2 WebHandler
API
基于HttpHandler
约定构建的org.springframework.web.server
包提供了通过一条具有单个WebHandler
、多个WebFilter
及多个WebExceptionHandler
组件组成的链处理请求的多用途web API。该链条可通过简单指定Spring ApplicationContext
的自动检测位置或在构建器中注册组件的方式和WebHttpHandlerBuilder
放在一起。
尽管HttpHandler
对使用不同HTTP服务器进行抽象的目的简单,WebHandler
API的目标却是提供大量web应用中广泛使用的功能。例如:
- 用户会话及属性。
- 请求属性。
- 请求中解析得到的
Locale
和Principal
。 - 数据解析和缓存的访问。
- 多段数据的抽象。
- 其他…
特别的Bean类型
下表列举的是在Spring应用上下文中WebHandlerBuilder
可自动检测的或可直接注册在其上的组件:
Bean名称 | Bean类型 | 数量 | 描述 |
---|---|---|---|
任意 | WebExceptionHandler |
0..N | 提供对来自WebFilter 链实例及目标WebHandler 的异常的处理。更多详情参考异常。 |
任意 | WebFilter |
0..N | 在过滤器链其它实例和目标WebHandler 之前和之后提供拦截式风格逻辑。更多详情参考过滤器。 |
webHandler |
WebHandler |
1 | 请求处理器。 |
webSessionManager |
WebSessionManager |
0..1 | 通过ServerWebExchange 中一个方法暴露WebSession 实例的管理器。默认使用DefaultWebSessionManager 。 |
serverCodecConfigurer |
ServerCodecConfigurer |
0..1 | 访问HttpMessageReader 实例解析数据和多段数据并在之后通过ServerWebExchange 中的方法暴露。默认使用ServerCodecConfigurer.create() 。 |
localContextResolver |
LocalContextResolver |
0..1 | 通过ServerWebExchange 暴露的LocaleContext 的解析器。默认使用AcceptHeaderLocaleContextResolver 。 |
forwardedHeaderTransformer |
ForwardedHeaderTransformer |
0..1 | 处理转发类头,对其扩展或删除。默认不使用。 |
表单数据
ServerWebExchange
暴露了如下访问表单数据的方法:
Java:
Mono<MultiValueMap<String, String>> getFormData();
Kotlin:
suspend fun getFormData(): MultiValueMap<String, String>
DefaultServerWebExchange
使用配置的HttpMessageReader
解析表单数据(application/x-www-form-urlencoded
)至一个MultiValueMap
中。默认情况下,ServerCodecConfigurer
bean配置使用FormHttpMessageReader
(参考Web Handler API)。
多段数据
ServerWebExchange
暴露了如下访问多段数据的方法:
Java:
Mono<MultiValueMap<String, Part>> getMultipartData();
Kotlin:
suspend fun getMultipartData(): MultiValueMap<String, Part>
DefaultServerWebExchange
使用配置的HttpMessageReader<MultiValueMap<String, Part>>
解析multipart/form-data
内容至一个MultiValueMap
中。目前,唯一一个被支持的第三方库是Synchronoss NIO Multipart,我们也只知道这一个多段数据请求非阻塞解析库。它通过ServerCodecConfigurer
配置(参考Web Handler API)。
要以流的形式解析多段数据,可以使用HttpMessageReader<Part>
返回的Flux<Part>
代替。例如,在使用注解的控制器中,使用@RequestPart
意味着使用Map
式根据名称访问各个部分,意味着需要将多段数据完全解析出来。与之相反,可以使用@RequestBody
将内容解码为Flux<Part>
而不将其聚合为MultiValueMap
。
转发头
对于穿过代理(如负载均衡)的请求,其主机名、端口和协议可能会改变,从客户端的角度,这带来了创建指向正确主机名、端口和协议的挑战。
RFC 7239定义了Forwarded
HTTP头,代理可以使用它提供原始请求的信息。此外也有一些非标准定义的头,包括X-Forwarded-Host
、X-Forwareded-Port
、X-Forwarded-Ssl
和X-Forwarded-Prefix
等。
ForwardedHeaderTransformer
是一个根据转发头修改请求主机名、端口和协议并在之后移除这些头信息的组件。您可以使用forwardedHeaderTransformer
作为名称声明这个组件,它会被检测到并使用。
需要考虑转发头的安全问题,因为应用不知道这些头信息是由代理按预想添加的,还是由客户端饿一天假的。这就是可信任的边界代理需要配置移除来自外界的不可信转发的原因。您也可以配置ForwardedHeaderTransformer
的removeOnly=true
,直接移除而不使用这些头信息。
在5.1中,
ForwardedHeaderFilter
已过时并由ForwardedHeaderTransformer
代替,转发头可以在交换器创建之前更早被处理。如果依然配置了该过滤器,会从过滤器列表中自动移除它,并使用ForwardedHeaderTransformer
代替。
1.2.3 过滤器
在WebHandler
API中,可以通过WebFilter
在过滤器链其它实例和目标WebHandler
之前和之后添加拦截器风格的逻辑。在使用WebFlux Config时,WebFilter
的注册和声明Spring bean一样简单,此外,(可选的)优先级的指定可以通过在Bean声明上使用@Order
注解或使之实现Ordered
接口。
CORS
Spring WebFlux提供了通过控制器注解对CORS的细粒度配置支持。然而,在使用Spring Security时,建议转用内建的CorsFilter
,且必须将其顺序放在Spring Security的过滤器链之前。
1.2.4 异常
在WebHandler
API中,可以使用WebExceptionHander
处理来自WebFilter
链实例及目标WebHandler
的异常。在使用WebFlux Config时,WebExceptionHander
的注册和声明Spring bean一样简单,此外,(可选的)优先级的指定可以通过在Bean声明上使用@Order
注解或使之实现Ordered
接口。
下表描述了可用的WebExceptionHandler
实现:
异常处理器 | 描述 |
---|---|
ResponseStatusExceptionHandler |
提供对ResponseStatusException 类型的异常的处理,设置HTTP响应该异常对应的状态码 |
WebFluxResponseStatusExceptionHandler |
扩展ResponseStatusExceptionHandler ,同时支持任何异常类型上的@ResponseStatus 注解。该处理器在WebFlux Config中声明。 |
1.2.5 编解码
spring-web
和spring-core
模块通过基于响应式流背压的非阻塞I/O提供了对高级对象到字节内容的序列化和反序列化支持。下面是对该支持的描述:
- Encoder和Decoder是独立于HTTP的底层编码和解码约定。
- HttpMessageReader和HttpMessageWriter是对HTTP消息内容编码和解码的约定。
-
Encoder
可通过EncoderHttpMessageWriter
封装适配web应用使用,同理Decoder
可通过DecoderHttpMessageReader
封装适配web应用使用。 -
DataBuffer
是对不同种字节缓冲区表示的抽象(如Netty的ByteBuf
、java.nio.ByteBuffer
等),所有编解码器都在其之上工作。参考《Spring核心》中的数据缓冲区及编解码器了解更多内容。
spring-core
模块提供了对byte[]
、ByteBuffer
、DataBuffer
、Resource
及String
的编解码器实现。spring-web
模块提供了Jackson JSON、Jackson Smile、JAXB2、Protocol Buffer等对专为HTTP消息(表单数据、多段内容、服务器事件等)的编解码器实现。
ClientCodedConfigurer
和ServerCodecCofigurer
通常用于在应用中配置或自定义编解码器。请参考HTTP 消息编解码配置小节。
Jackson JSON
使用Jackson库,可以提供JSON和二进制JSON(Smile)的支持。
Jackson2Decoder
的方式工作如下:
- 使用Jackson的异步非阻塞解析器聚合一段流为
TokenBuffer
字节块,每一块代表一个JSON对象。 - 每个
TokenBuffer
都会传给Jackson的ObjectMapper
用于创建高级对象。 - 在解码单值发布者(如
Mono
)时,只有一个TokenBuffer
。 - 在解码多值发布者(如
Flux
)时,每个TokenBuffer
在接收到足以表示完整对象的字节后生成并传给ObjectMapper
。输入的内容可以是JSON数组,如果content-type是application/stream+json
的话亦可以是行界定JSON。
Jaclson2Encoder
的工作方式如下:
- 单值发布者(如
Mono
),直接通过ObjectMapper
序列化。 - 对于
application/json
形式的多值发布者,默认使用Flux#collectToList()
收集值,并在之后序列化结果集。 - 对于流媒体类型的多值发布者,如
application/stream+json
或application/stream-x-jackson-smile
,通过行界定JSON格式分别编码、写入或回刷每个值。 - 对SSE,每个事件都会调用
Jackson2Encoder
,回刷输出以保证传输无延迟。
默认情况下
Jacoson2Encoder
和Jackson2Decoder
都不支持String
类型。作为替代,默认假设字符串或一系列字符串代表已序列化的JSON内容,会通过CharSequenceEncoder
渲染。如果需要从Flux<String>
中渲染出JSON数组,则需使用Flux#collectToList()
并编码为Mono<List<String>>
。
表单数据
FormHttpMessageReader
和FormHttpMessageWriter
支持application/x-www-form-urlencoded
类型内容的编解码。
在服务器端,表单内容可能需要在多个地方访问。ServerWebExchange
专门提供了一个getFormData()
方法,通过FormHttpMessageReader
解析内容并缓存结果以待后续重复访问。参考WebHandler
API的表单数据小节。
一旦使用了getFormData()
方法,就不能再从请求体中读取原始内容了。因此,应用需要统一通过ServerWebExchange
获取数据的方式,要么访问已缓存的表单数据,要么读取原始请求体。
多段数据
MultipartHttpMessageReader
和MultipartHttpMessageWriter
支持multipart/form-data
类型内容的编解码。MultipartHttpMessageReader
代理其他HttpMessageReader
真正Flux<Part>
解析的执行,并在之后简单的将各部分收集为MultiValueMap
。当前使用Synchronoss NIO Multipart作为解析库。
在服务器端,多段表单内容可能需要在多个地方访问。ServerWebExchange
专门提供了一个getMultipartData()
方法,通过MultipartHttpMessageReader
解析内容并缓存结果以待后续重复访问。参考WebHandler
API的多段数据小节。
一旦使用了getMultipartData()
方法,就不能再从请求体中读取原始内容了。因此,应用要么反复调用getMultipartData()
方法并使用Map式访问内容,要么依赖SynchronossPartHttpMessageReader
一次性访问Flux<Part>
。
流
当在HTTP响应中使用流(如text/event-stream
和application/stream+json
)时,为了可靠地监测到迟早会断开连接的客户端,周期性的数据发送非常重要。这种数据发送可以是仅注释的空SSE事件,或其他任何可以有效充当心跳的“无操作”数据。
DataBuffer
DataBuffer
是WebFlux中对字节缓冲区的表示。Spring核心参考文档的数据缓冲区和编解码器小节有更多相关内容。要理解的关键点是在像Netty等服务器中,字节缓冲区会通过池和引用计数管理并在消耗后释放,以避免内存泄漏。
WebFlux应用通常无需关注这些问题,除非直接建立或消耗字节缓冲区,而不是依赖编解码器对高级对象进行转换。或者,也有可能是他们决定创建自定义的编解码器。对这些场景,请复习《数据缓冲区和编解码器》小节,特别是《DataBuffer的使用》。
日志
Spring WebFlux中DEBUG级别日志的设计理念是小巧、极简且对用户友好。它重点关注一遍又一遍出现的高价值信息,而且他信息则仅在调试特定问题时才有用。
TRACE级别的日志遵循和DEBUG级别相同的理论(例如,同样不能成坨输出)但其可以用于问题调试。此外一些日志信息在TRACE级别和DEBUG级别以不同的详细程度展示。
好的日志记录源自使用日志框架的经验。如果您发现任何不符合既定目标的地方,请告诉我们。
日志ID
在WebFlux中,一次请求会在多个线程中执行,线程ID在分辨属于特定请求的日志信息上没什么用。这就是WebFlux默认在日志消息上加入请求专属ID的原因。
在服务器端,日志ID存放在ServerWebExchange
属性(LOG_ID_ATTRIBUTE
)中,同时可通过ServletWebExchange#getLogPrefix()
方法获取基于该ID的完整格式化的前缀。在WebClient
端,日志ID存在ClientRequest
属性(LOG_ID_ATTRIBUTE
)中同时可通过ClientRequest#logPrefix()
方法获取基于该ID的完整格式化的前缀。
敏感数据
DEBUG
和TRACE
日志可记录敏感信息。这就是表单参数及头信息默认被隐去的原因,如有需要,必须明确启用在此之上的完整日志记录。
下文示例展示了如何开启服务端请求的全量日志:
Java:
@Configuration
@EnableWebFlux
class MyConfig implements WebFluxConfigurer {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.defaultCodecs().enableLoggingRequestDetails(true);
}
}
Kotlin:
@Configuration
@EnableWebFlux
class MyConfig : WebFluxConfigurer {
override fun configureHttpMessageCodecs(configurer: ServerCodecConfigurer) {
configurer.defaultCodecs().enableLoggingRequestDetails(true)
}
}
下文示例展示了如何开启客户端请求的全量日志:
Java:
Consumer<ClientCodecConfigurer> consumer = configurer ->
configurer.defaultCodecs().enableLoggingRequestDetails(true);
WebClient webClient = WebClient.builder()
.exchangeStrategies(ExchangeStrategies.builder().codecs(consumer).build())
.build();
Kotlin:
val consumer: (ClientCodecConfigurer) -> Unit = { configurer -> configurer.defaultCodecs().enableLoggingRequestDetails(true) }
val webClient = WebClient.builder()
.exchangeStrategies(ExchangeStrategies.builder().codecs(consumer).build())
.build()
(未完待续)