深入剖析 Spring WebFlux

一、WebFlux 简介

WebFlux 是 Spring Framework5.0 中引入的一种新的反应式Web框架。通过Reactor项目实现Reactive Streams规范,完全异步和非阻塞框架。本身不会加快程序执行速度,但在高并发情况下借助异步IO能够以少量而稳定的线程处理更高的吞吐,规避文件IO/网络IO阻塞带来的线程堆积。

1.1 WebFlux 的特性

WebFlux 具有以下特性:

  • 异步非阻塞 - 可以举一个上传例子。相对于 Spring MVC 是同步阻塞IO模型,Spring WebFlux这样处理:线程发现文件数据没传输好,就先做其他事情,当文件准备好时通知线程来处理(这里就是输入非阻塞方式),当接收完并写入磁盘(该步骤也可以采用异步非阻塞方式)完毕后再通知线程来处理响应(这里就是输出非阻塞方式)。

  • 响应式函数编程 - 相对于Java8 Stream 同步、阻塞的Pull模式,Spring Flux 采用Reactor Stream 异步、非阻塞Push模式。书写采用 Java lambda 方式,接近自然语言形式且容易理解。

  • 不拘束于Servlet - 可以运行在传统的Servlet 容器(3.1+版本),还能运行在Netty、Undertow等NIO容器中。

1.2 WebFlux 的设计目标

  • 适用高并发

  • 高吞吐量

  • 可伸缩性

二、Spring WebFlux 组件介绍

2.1 HTTPHandler

一个简单的处理请求和响应的抽象,用来适配不同HTTP服务容器的API。

image

2.2 WebHandler

一个用于处理业务请求抽象接口,定义了一系列处理行为。相关核心实现类如下;

image

2.3 DispatcherHandler

请求处理的总控制器,实际工作是由多个可配置的组件来处理。

image

WebFlux是兼容Spring MVC 基于@Controller,@RequestMapping等注解的编程开发方式的,可以做到平滑切换。

2.4 Functional Endpoints

这是一个轻量级函数编程模型。是基于@Controller,@RequestMapping等注解的编程模型的替代方案,提供一套函数式API 用于创建Router,Handler和Filter。调用处理组件如下:

image

简单的RouterFuntion 路由注册和业务处理过程:

@Bean
public RouterFunction<ServerResponse> initRouterFunction() {
    return RouterFunctions.route()
        .GET("/hello/{name}", serverRequest -> {
            String name = serverRequest.pathVariable("name");
            return ServerResponse.ok().bodyValue(name);
        }).build();
}

请求转发处理过程:

image

2.5 Reactive Stream

这是一个重要的组件,WebFlux 就是利用Reactor 来重写了传统Spring MVC 逻辑。其中Flux和Mono 是Reactor中两个关键概念。掌握了这两个概念才能理解WebFlux工作方式。

Flux和Mono 都实现了Reactor的Publisher接口,属于时间发布者,对消费者提供订阅接口,当有事件发生的时候,Flux或者Mono会通过回调消费者的相应的方法来通知消费者相应的事件。这就是所谓的响应式编程模型。

Mono工作流程图

image

只会在发送出单个结果后完成。

Flux工作流程图

image

发送出零个或者多个,可能无限个结果后才完成。

对于流式媒体类型:application/stream+json 或者 text/event-stream ,可以让调用端获得服务器滚动结果。
对于非流类型:application/json  WebFlux 默认JSON编码器会将序列化的JSON 一次性刷新到网络,这并不意味着阻塞,因为结果Flux<?> 是以反应式方式写入网络的,没有任何障碍。

三、WebFlux 工作原理

3.1 组件装配过程

image

流程相关源码解析-WebFluxAutoConfiguration

@Configuration
//条件装配 只有启动的类型是REACTIVE时加载
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
//只有存在 WebFluxConfigurer实例  时加载
@ConditionalOnClass(WebFluxConfigurer.class)
//在不存在  WebFluxConfigurationSupport实例时 加载
@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
//在之后装配
@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class,
      CodecsAutoConfiguration.class, ValidationAutoConfiguration.class })
//自动装配顺序
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class WebFluxAutoConfiguration {
   @Configuration
   @EnableConfigurationProperties({ ResourceProperties.class, WebFluxProperties.class })
   //接口编程 在装配WebFluxConfig 之前要先 装配EnableWebFluxConfiguration
   @Import({ EnableWebFluxConfiguration.class })
   public static class WebFluxConfig implements WebFluxConfigurer {
      //隐藏部分源码
     /**
     * Configuration equivalent to {@code @EnableWebFlux}.
     */
   } 
    @Configuration
    public static class EnableWebFluxConfiguration
            extends DelegatingWebFluxConfiguration {
        //隐藏部分代码
    }
    @Configuration
    @ConditionalOnEnabledResourceChain
    static class ResourceChainCustomizerConfiguration {
        //隐藏部分代码
    }
    private static class ResourceChainResourceHandlerRegistrationCustomizer
            implements ResourceHandlerRegistrationCustomizer {
        //隐藏部分代码
    }

WebFluxAutoConfiguration 自动装配时先自动装配EnableWebFluxConfiguration 而EnableWebFluxConfiguration->DelegatingWebFluxConfiguration ->WebFluxConfigurationSupport。

最终WebFluxConfigurationSupport 不仅配置DispatcherHandler 还同时配置了其他很多WebFlux核心组件包括 异常处理器WebExceptionHandler,映射处理器处理器HandlerMapping,请求适配器HandlerAdapter,响应处理器HandlerResultHandler 等。

DispatcherHandler 创建初始化过程如下;

public class WebFluxConfigurationSupport implements ApplicationContextAware {
   //隐藏部分代码
   @Nullable
   public final ApplicationContext getApplicationContext() {
      return this.applicationContext;
   }
    //隐藏部分代码
   @Bean
   public DispatcherHandler webHandler() {
      return new DispatcherHandler();
   }
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
   @Nullable
   private List<HandlerMapping> handlerMappings;
   @Nullable
   private List<HandlerAdapter> handlerAdapters;
   @Nullable
   private List<HandlerResultHandler> resultHandlers;
    @Override
   public void setApplicationContext(ApplicationContext applicationContext) {
        initStrategies(applicationContext);
   }
   protected void initStrategies(ApplicationContext context) {
      //注入handlerMappings
      Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerMapping.class, true, false);

      ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
      AnnotationAwareOrderComparator.sort(mappings);
      this.handlerMappings = Collections.unmodifiableList(mappings);
      //注入handlerAdapters
      Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerAdapter.class, true, false);

      this.handlerAdapters = new ArrayList<>(adapterBeans.values());
      AnnotationAwareOrderComparator.sort(this.handlerAdapters);
      //注入resultHandlers
      Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
            context, HandlerResultHandler.class, true, false);

      this.resultHandlers = new ArrayList<>(beans.values());
      AnnotationAwareOrderComparator.sort(this.resultHandlers);
   }

流程相关源码解析-HTTPHandlerAutoConfiguration

上面已讲解过WebFlux 核心组件装载过程,那么这些组件又是什么时候注入到对应的容器上下文中的呢?其实是在刷新容器上下文时注入进去的。

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefresh

public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext
      implements ConfigurableWebServerApplicationContext {
   @Override
   protected void onRefresh() {
      super.onRefresh();
      try {
         createWebServer();
      }
      catch (Throwable ex) {
         throw new ApplicationContextException("Unable to start reactive web server", ex);
      }
   }
   private void createWebServer() {
      WebServerManager serverManager = this.serverManager;
      if (serverManager == null) {
         String webServerFactoryBeanName = getWebServerFactoryBeanName();
         ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
         boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
         // 这里创建容器管理时注入httpHandler
         this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
         getBeanFactory().registerSingleton("webServerGracefulShutdown",
               new WebServerGracefulShutdownLifecycle(this.serverManager));
         // 注册一个 web容器启动服务类,该类继承了SmartLifecycle
         getBeanFactory().registerSingleton("webServerStartStop",
               new WebServerStartStopLifecycle(this.serverManager));
      }
      initPropertySources();
   }
   protected HttpHandler getHttpHandler() {
        String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);
        if (beanNames.length == 0) {
            throw new ApplicationContextException(
                    "Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");
        }
        if (beanNames.length > 1) {
            throw new ApplicationContextException(
                    "Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "
                            + StringUtils.arrayToCommaDelimitedString(beanNames));
        }
        //容器上下文获取httpHandler
        return getBeanFactory().getBean(beanNames[0], HttpHandler.class);
    }

而这个HTTPHandler 是由HTTPHandlerAutoConfiguration装配进去的。

@Configuration
@ConditionalOnClass({ DispatcherHandler.class, HttpHandler.class })
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@ConditionalOnMissingBean(HttpHandler.class)
@AutoConfigureAfter({ WebFluxAutoConfiguration.class })
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class HttpHandlerAutoConfiguration {
   @Configuration
   public static class AnnotationConfig {
      private ApplicationContext applicationContext;
      public AnnotationConfig(ApplicationContext applicationContext) {
         this.applicationContext = applicationContext;
      }
      //构建WebHandler
      @Bean
      public HttpHandler httpHandler() {
         return WebHttpHandlerBuilder.applicationContext(this.applicationContext)
               .build();
      }
   }

流程相关源码解析-web容器

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer 。在创建WebServerManager 容器管理器时会获取对应web容器实例,并注入响应的HTTPHandler。

class WebServerManager {
   private final ReactiveWebServerApplicationContext applicationContext;
   private final DelayedInitializationHttpHandler handler;
   private final WebServer webServer;
   WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
         Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
      this.applicationContext = applicationContext;
      Assert.notNull(factory, "Factory must not be null");
      this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
      this.webServer = factory.getWebServer(this.handler);
   }
}

以Tomcat 容器为例展示创建过程,使用的是 TomcatHTTPHandlerAdapter 来连接Servlet 请求到HTTPHandler组件。

public class TomcatReactiveWebServerFactory extends AbstractReactiveWebServerFactory implements ConfigurableTomcatWebServerFactory {
    //隐藏部分代码   
    @Override
    public WebServer getWebServer(HttpHandler httpHandler) {
        if (this.disableMBeanRegistry) {
            Registry.disableRegistry();
        }
        Tomcat tomcat = new Tomcat();
        File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");
        tomcat.setBaseDir(baseDir.getAbsolutePath());
        Connector connector = new Connector(this.protocol);
        connector.setThrowOnFailure(true);
        tomcat.getService().addConnector(connector);
        customizeConnector(connector);
        tomcat.setConnector(connector);
        tomcat.getHost().setAutoDeploy(false);
        configureEngine(tomcat.getEngine());
        for (Connector additionalConnector : this.additionalTomcatConnectors) {
            tomcat.getService().addConnector(additionalConnector);
        }
        TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);
        prepareContext(tomcat.getHost(), servlet);
        return getTomcatWebServer(tomcat);
    }
}

最后Spring容器加载后通过SmartLifecycle实现类WebServerStartStopLifecycle 来启动Web容器。

WebServerStartStopLifecycle 注册过程详见:org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer

3.2 完整请求处理流程

image

(引用自:https://blog.csdn.net)

该图给出了一个HTTP请求处理的调用链路。是采用Reactor Stream 方式书写,只有最终调用 subscirbe 才真正执行业务逻辑。基于WebFlux 开发时要避免controller 中存在阻塞逻辑。列举下面例子可以看到Spring MVC 和Spring Webflux 之间的请求处理区别。

@RestControllerpublic
class TestController {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @GetMapping("sync")
    public String sync() {
        logger.info("sync method start");
        String result = this.execute();
        logger.info("sync method end");
        return result;
    }
    @GetMapping("async/mono")
    public Mono<String> asyncMono() {
        logger.info("async method start");
        Mono<String> result = Mono.fromSupplier(this::execute);
        logger.info("async method end");
        return result;
    }
    private String execute() {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }
}

日志输出

2021-05-31 20:14:52.384  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method start
2021-05-31 20:14:57.385  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method end
2021-05-31 20:15:09.659  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method start
2021-05-31 20:15:09.660  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method end

从上面例子可以看出sync() 方法阻塞了请求,而asyncMono() 没有阻塞请求并立刻返回的。asyncMono() 方法具体业务逻辑 被包裹在了Mono 中Supplier中的了。当execute 处理完业务逻辑后通过回调方式响应给浏览器。

四、存储支持

一旦控制层使用了 Spring Webflux 则安全认证层、数据访问层都必须使用 Reactive API 才真正实现异步非阻塞。

NOSQL Database

  • MongoDB (org.springframework.boot:spring-boot-starter-data-mongodb-reactive)。

  • Redis(org.springframework.boot:spring-boot-starter-data-redis-reactive)。

Relational Database

  • H2 (io.r2dbc:r2dbc-h2)

  • MariaDB (org.mariadb:r2dbc-mariadb)

  • Microsoft SQL Server (io.r2dbc:r2dbc-mssql)

  • MySQL (dev.miku:r2dbc-mysql)

  • jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)

  • Postgres (io.r2dbc:r2dbc-postgresql)

  • Oracle (com.oracle.database.r2dbc:oracle-r2dbc)

五、总结

关于Spring MVC 和Spring WebFlux 测评很多,本文引用下做简单说明。参考:《Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC》。

基本依赖

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-r2dbc</artifactId> 
</dependency> 
<!-- r2dbc 连接池 --> 
<dependency> 
    <groupId >io.r2dbc</groupId> 
    <artifactId>r2dbc-pool</artifactId> 
</dependency> 
<!--r2dbc mysql 库--> 
<dependency> 
    <groupId>dev.miku</groupId> 
    <artifactId>r2dbc- mysql</artifactId> 
</dependency> 
<!--自动配置需要引入一个嵌入式数据库类型对象--> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-data-jdbc</artifactId> 
</dependency>
<!-- 反应方程式 web 框架 webflux--> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-webflux</artifactId> 
</dependency>

相同数据下效果如下

image

Spring MVC + JDBC 在低并发下表现最好,但 WebFlux + R2DBC 在高并发下每个处理请求使用的内存最少。

image

Spring WebFlux + R2DBC 在高并发下,吞吐量表现优异。

作者:vivo互联网服务器团队-Zhou Changqing

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335

推荐阅读更多精彩内容