spring webflux(二)

   上一篇讲了webflux的简单使用,但如果深入点想就会有很多疑问。webflux是如何同netty协作的?响应式的线程是如何调度的?一个请求是怎么来到我们定义的RequestMapping方法的?本篇通过对webflux源码的阅读,简要分析这几点。
   spring webflux基于reactor,默认的容器为netty,所以想学习spring webflux的源码,必须要有这两个技术的知识。这里先简要介绍一下。

Netty

   Netty是一个高性能、异步事件驱动的NIO框架,提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的。
image.png

   上图是netty的基本工作流程,简单来说:
· 一个EventLoopGroup包含一个或多个EventLoop。
· 一个EventLoop在生命中周期绑定到一个Thread上。
· EventLoop使用其对应的Thread处理IO事件。
· 一个Channel使用EventLoop进行注册。
· 一个EventLoop可被分配至一个或多个Channel。
   EventLoop除了要负责处理绑定的Channel所有io操作,由于其继承了Executor接口,还可以执行提交的任务。需要注意的是,提交给EventLoop的任务必须是非阻塞的,否则将使io处理没有资源,导致整个应用吞吐量下降。
   netty处理数据流程如下:


image.png

   数据从客户端传入服务端称为出站,反之称为入站。一个socket链接为一个channel,一个channel有一个channelpipeline,pipeline是入站处理器和出站处理器的链式集合。当一个请求msg从某个channel入站时,将从该channel绑定的pipeline的head开始,经过一个个ChannelInboundHandler入站处理器的处理。反之,当一个响应msg从channel出站时,将从该channel绑定的pipeline的tail开始,经过一个个ChannelOutboundHandler入站处理器的处理。

   当某个ChannelHandler被添加到ChannelPipeline中时,会为其创建一个ChannelHandlerContext,ChannelHandler可以访问其绑定的ChannelHandlerContext,从而和pipeline交互。有点类似拦截器链,ChannelHandler通过ChannelHandlerContext将数据交给pipe中的下一个处理器处理。

Reactor

   Reactor的核心是Mono和Flux两个类,他们都继承了Publisher接口,代表一个数据流的发布者。其中,一个Flux代表一个0~N个元素的序列发射源,而Mono代表只有0或1个元素的发射源。可以通过subscribe()方法订阅发射源,类似java中的stream操作,在执行subscribe()之前,Mono和Flux并不会开始发射数据。
   Mono和Flux提供了丰富的api可以进行链式调用,并且可以通过subscribeOn()或者publishOn()指定Mono中某一步操作的执行线程。如果不指定Mono或Flux的执行线程,那么默认会在调用subscribe()的线程上运行,这一点也和Stream相似。

    public Mono<String> say(String name) {
        return Mono.just(name)
            .publishOn(Schedulers.elastic())
            .map(Try.of(this::hello));
    }

    private String hello(String name) throws InterruptedException {
        Thread.sleep(10000);
        String result = String.format("hello %s, current-thread is [%s]", name, Thread.currentThread().getName());
        System.out.println(result);
        return result;
    }

上述代码将打印:
main thread
hello nihao, current-thread is [elastic-2]
但如果将
.publishOn(Schedulers.elastic())注释那么结果是:
hello nihao, current-thread is [main]
main thread
从这里可以看出Reactor的优点,她极大的简化了异步编程中线程切换处理的难度。

Spring Webflux

   回到Webflux,他是如何工作的?可以从@EnableWebFlux注解开始看,这里略去复杂的过程直接说结果。首先,WebFlux的核心仍然和MVC一样是Dispatcher,另外简单来说,可以将整个过程描述为两步:
一、向Netty注册ChannelHandler
二、Netty调用ChannelHandler
先说向Netty注册ChannelHandler。

  1. 容器启动,@EnableWebFlux注解引入的DelegatingWebFluxConfiguration配置类向容器注册基础bean,包括DispatcherHandler、WebFluxResponseStatusExceptionHandler、RequestMappingHandlerMapping等。
  2. 向WebHttpHandlerBuilder传入ApplicationContext,WebHttpHandlerBuilder利用context获得WebHandler(Dispatcher)、List<WebFilter>、List<WebExceptionHandler>等bean,并构造HttpHandler(HttpWebHandlerAdapter)。HttpHandler接口的方法:Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response)。很明显,她是Webflux功能的集合和对外接口。
  3. 由于使用Netty,使用ReactorHttpHandlerAdapter是包装HttpHandler,ReactorHttpHandlerAdapter作为WebFlux上层到底层容器Netty的桥梁,类似的还有ServletHttpHandlerAdapter,链接Servlet容器。
  4. 使用httpServer.newHandler(adapter)注册ReactorHttpHandlerAdapter。 httpServer下一层是tcpServer,tcpServer将ReactorHttpHandlerAdapter包装包装成ContextHandler,ContextHandler实现了ChannelInitializer接口,可以用来向Netty注册ChannelHandler
@Override
    public final Mono<? extends NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) {
        Objects.requireNonNull(handler, "handler");
        return Mono.create(sink -> {
            ServerBootstrap b = options.get();
            SocketAddress local = options.getAddress();
            b.localAddress(local);
            ContextHandler<Channel> contextHandler = doHandler(handler, sink);
            b.childHandler(contextHandler);
            if(log.isDebugEnabled()){
                b.handler(loggingHandler());
            }
            contextHandler.setFuture(b.bind());
        });
    }
  1. ContextHandler在initChannel()->accept()方法中向pipeline中注册了ChannelOperationsHandler,ChannelOperationsHandler实现了Netty的ChannelDuplexHandler接口,可以通过ChannelHandlerContext处理入站和出站数据。
channel.pipeline()
                   .addLast(NettyPipeline.ReactiveBridge,
                           new ChannelOperationsHandler(this));
  1. ChannelOperationsHandler通过channelActive()在channel每次激活时调用了ContextHandler的createOperations()方法创建ChannelOperations。这一步很关键,首先通过channelActive()说明ChannelOperations是与一次请求对应的。另外看createOperations()的代码,有两处:
ChannelOperations<?, ?> op =
                    channelOpFactory.create((CHANNEL) channel, this, msg);
channel.eventLoop().execute(op::onHandlerStart);

这说明了Spring Webflux处理一次请求的方式:

  1. 一个request到Netty时,Netty传递给ChannelOperationsHandler
  2. ChannelOperationsHandler将数据和对数据的操作封装成ChannelOperations
  3. 将ChannelOperations作为一个任务提交给Netty的eventLoop
  4. eventLoop在对一个channel的pipeline调用完成后,将执行提交的任务,此时将进入处理Spring WebFlux的操作。

来看看ChannelOperations做了什么,首先明确几点:ChannelOperations中封装了this(ContextHandler),而ContextHandler中封装了ReactorHttpHandlerAdapter,ReactorHttpHandlerAdapter中则封装了HttpHandler,HttpHandler中则是核心组件WebHandler(Dispatcher)、List<WebFilter>、List<WebExceptionHandler>等的集合。

protected final void applyHandler() {
//      channel.pipeline()
//             .fireUserEventTriggered(NettyPipeline.handlerStartedEvent());
       if (log.isDebugEnabled()) {
           log.debug("[{}] {} handler is being applied: {}", formatName(), channel
                   (), handler);
       }
       try {
           Mono.fromDirect(handler.apply((INBOUND) this, (OUTBOUND) this))
               .subscribe(this);
       }
       catch (Throwable t) {
           log.error("", t);
           channel.close();
       }
   }

关键点又来了,提交给eventLoop 的任务做了什么:

Mono.fromDirect(handler.apply((INBOUND) this, (OUTBOUND) this))
               .subscribe(this);

handler就是包装在ChannelOperations中的ReactorHttpHandlerAdapter,可以看出这里的Mono并没有做线程切换,所以会由当前线程(eventLoop)执行ReactorHttpHandlerAdapter的apply()方法,ReactorHttpHandlerAdapter在其apply()方法里调用了她包装的httpHandler的handle(), 而httpHandler的handle()又调用了Dispatcher的handler():
image.png

从中我们可以得到使用Spring WebFlux的核心法则:

绝对不要阻塞Controller的方法。

因为这将导致eventLoop线程的阻塞,而eventLoop线程数量一般只有cpu核心数*2个,如果阻塞了eventLoop线程将导致真个服务不可用。
下面做个小实验

package com.xinan.demo.rest;

import com.xinan.demo.util.Try;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author xinan
 * @date 2018/8/3
 */
@RestController
@RequestMapping("hello")
@Slf4j
public class HelloController {

    @GetMapping
    public Mono<String> say(String name) {
        return Mono.just(name)
            .map(Try.of(this::hello));
    }

    private String hello(String name) throws InterruptedException {
        Thread.sleep(10000);
        String result = String.format("hello %s, current-thread is [%s]", name, Thread.currentThread().getName());
        System.out.println(result);
        return result;
    }

    @GetMapping("nob")
    public Mono<String> nob() {
        return Mono.just(Thread.currentThread().getName());
    }
}

通过webbench say()同时发送20个请求(大于eventLoop线程数)
webbench -c 20 -t 30 http://localhost:8080/hello/say
再立即通过浏览器访问 http://localhost:8080/hello/nob,可以看到http://localhost:8080/hello/nob接口直到10秒之后才返回结果。这就是阻塞了eventLoop线程的结果,耗尽了线程资源,导致服务不可用。
   本文简要分析了Spring WebFlux的主要工作流程,可以看到WebFlux在线程调用方面和Spring MVC还是又很大不同的,在使用中一定要注意WebFlux的特点,避免错误使用导致性能远低于预期。
   如有错误,恳请批评指正!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335

推荐阅读更多精彩内容