spring webflux(一)

所有示例代码:https://github.com/cumtbzy2011/webfluxdemo

功能与api

背景

Netty作为java领域首屈一指的nio框架,其以优越的性能被众多中间件所使用。但到了java的web开发领域,却很难享受到Netty的性能优势。其原因在于传统web开发基于servlet容器,许多依赖和开发框架都是基于servlet实现的,比如spring。而netty为了保持代码的简单和高效,并没有实现servlet标准,这就导致将web容器迁移到netty后许多框架和第三方库不能使用,迁移的成本过大。但spring webflux出现改变了这一现状。她在兼容原有mvc开发方式的同时,重写和实现了大量第三方库,在提升性能的同时,降低了迁移的成本。同时spring webflux适配多种web容器,即使仍然使用tomcat也是可以的。

接口声明

接口声明除了保留原有注解式声明的方式,为了满足reactor的编程风格,额外支持了函数式声明的方式。通工具类RouterFunctions过构造RounterFunction对象,并向Spring注入实现函数式接口声明。

    @Bean
    public TestHandler testHandler() {
        return new TestHandler();
    }

    @Bean
    public RouterFunction<ServerResponse> routes(TestHandler testHandler) {
        return RouterFunctions.route(RequestPredicates.POST("/route"),
            testHandler::echoName);
    }

    @GetMapping("anno")
    public String sayHello(String name) {
        return "hello world! " + name;
    }

    class TestHandler {
        public Mono<ServerResponse> echoName(ServerRequest request) {
            return request.bodyToMono(Post.class)
              .map(Post::getName)
              .flatMap(name -> ServerResponse.ok()
                .contentType(MediaType.TEXT_PLAIN)
                .body(BodyInserters.fromObject("hello world!" + name)));
        }
    }

在WebFlux中,request和respose不再是原来的ServletRequest和ServletRequest,取而代之的是ServerRequest和ServerResponse。这两个对象是webflux新出现的。首先webflux底层如果使用了reactor-netty,那么自然就没有所谓的servlet一说,另外ServerRequest和ServerResponse提供了对non-blocking和backpressure特性的支持,提供了将Http消息内容转换成Mono和Flux的方法,使响应式编程成为了可能。

过滤器Filter

过滤器的使用方法和spring mvc类似,不过与ServerRequest和ServerResponse相同的是,webflux提供了一个新的过滤器接口WebFilter以提供对Mono和Flux的支持。代码如下:

@Component
public class DemoWebFilter implements WebFilter{

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
        if (!serverWebExchange.getRequest().getHeaders().containsKey("token")) {
            serverWebExchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
            return Mono.empty();
        }
        return webFilterChain.filter(serverWebExchange);
    }
}

值得注意的是Mono<Void>这个返回值,在框架的很多地方都会用到。他意味着一个空的Mono,对于任何读取,他会立刻发出一个complete信号。相比与直接返回void,Mono<Void>作为方法的返回值时,可以对该方法进行链式调用。另外虽然Mono<Void>虽然没有返回值,但是其本身的complete或者error状态,也可以注册回调进行异步处理。

异常处理

在Spring Webflux中,异常分两种。一是controller中方法抛出的异常,这在webflux中同样可以像在mvc中用@ExceptionHandler声明异常处理方法。二是在WebHandler API这种比较偏底层的api,典型的是WebFilter,异常处理使用了支持Mono的新接口:WebExceptionHandler,可用于处理来自WebFilter链和WebHandler的异常。使用WebExceptionHandler时,只要将其声明为Spring bean即可自动注入并使用,并可选择通过bean声明上的@Order或通过实现Ordered来表示优先级。需要注意的是webflux有默认的WebExceptionHandler-DefaultErrorWebExceptionHandler,其order为默认的-1。如果我们想自定义WebExceptionHandler,那么必须将order声明为-2以上,否则异常将不会传递到我们自定义的WebExceptionHandler中。

@Component
//要比DefaultErrorWebExceptionHandler优先级-1高
//比较底层,如果异常被@ExceptionHandler处理了,那么将不会由此处理
//可以处理filter和webHandler中的异常
@Order(-2)
public class ErrorLogHandler implements WebExceptionHandler {
    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        exchange.getResponse().setStatusCode(HttpStatus.OK);
        byte[] bytes = ("ErrorLogHandler: " + ex.getMessage()).getBytes(StandardCharsets.UTF_8);
        DataBuffer wrap = exchange.getResponse().bufferFactory().wrap(bytes);
        return exchange.getResponse().writeWith(Flux.just(wrap));
    }
}
@ExceptionHandler(Exception.class)
    public String test(Exception e) {
        return "@ExceptionHandler: " + e.getMessage();
}

Multipart和Stream

在基础框架reactor中Mono代表一个单次发送的数据源,而Flux代表一个可多次发送的数据源。在spring webflux的controller中,Mono很好理解,代表前端的一次传参或接口的一次返回。那么Flux该如何使用呢?简单来说Flux在这两个场景下使用:接受Multipart参数、返回Stream类型数据或者用于分批返回。代码如下:

@PostMapping(value = "", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    Mono<String> requestBodyFlux(@RequestBody Flux<Part> parts) {
        return parts.map(part -> part instanceof FilePart
              ? part.name() + ":" + ((FilePart) part).filename()
              : part.name())
          .collect(Collectors.joining(",", "[", "]"));
    }

    //如果不是application/stream json則呼叫端無法滾動得到結果,將一直阻塞等待資料流結束或超時。
    @GetMapping(value = "stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Post> getBeanStream() {
        return Flux.interval(Duration.ofMillis(500))
          .map(l -> new Post("bian", LocalDateTime.now()))
          .log();
    }

Multipart是Htp请求的一种常见的数据结构,常用于表单提交。在spring mvc中,表单中的每个键值对会映射成一个个part。到了webflux,自然而然地转换成代表多个表单字段Flux。而返回值Flux,则对应了一种新的MediaType:APPLICATION_STREAM_JSON_VALUE。他的使用需要浏览器或者客户端的支持。从使用中来看,浏览器会对每一次返回的数据分批处理。如果简单的get调用,会在页面滚动打印返回值,直到Flux发射完成:


image.png

而如果接口并没有声明produces = MediaType.APPLICATION_STREAM_JSON_VALUE的媒体类型,浏览器将会在Flux所有数据发射完毕后一次性打印。

WebSocket

在webflux中使用WebSocket功能很简单,只要注册WebSocketHandlerAdapter用于websocket协议的握手,再定义对应路径的websocket消息处理器即可:

@Configuration
@ComponentScan
@EnableWebFlux
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/echo", new EchoWebSocketHandler());
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        return mapping;
    }
    
    @Bean
    WebSocketHandlerAdapter webSocketHandlerAdapter(){
        return new WebSocketHandlerAdapter();
    }
}
public class EchoWebSocketHandler implements WebSocketHandler {

    public EchoWebSocketHandler() {
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(    //1. 向一个websocket连接发送一段消息
          session.receive()     //2. 获得入站消息流
            .doOnNext(          //3. 对每一个websocket消息进行处理,相当于stream的map,返回的仍是一个流
              WebSocketMessage::retain  //4. 保留消息(主要针对池化内存(内部使用了netty的ByteBuf),使之引用计数+1,避免过早回收)
            )
        );
    }
}

需要注意的是,通过webSocketSession.receive() 获得的Flux<WebSocketMessage>,其每一次发射的数据WebSocketMessage如果是再Netty容器中,是一个对Netty中ByteBuf的保证,而ByteBuf在使用中有一点要注意,就是谁使用谁释放、retain()和release()成对出现。所以当把Flux<WebSocketMessage>发射的WebSocketMessage传递给其他方法使用时,注意要retain()增加一次计数,避免上一级方法release()使ByteBuf引用计数归零,导致过早回收。关于Netty的内存使用,下面会写一篇简要的介绍文章。

Mongo

MongoDB由于支持异步客户端,所以很适合在webflux项目中使用,spring-data-reactor也在第一时间做了支持。配合springboot的@EnableMongoAuditing注解,可以很快搭建异步mongo客户端。相关代码如下:

@SpringBootApplication
@EnableMongoAuditing
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}


@Component
@Slf4j
class DataInitializer implements CommandLineRunner {

    private final PostRepository posts;

    public DataInitializer(PostRepository posts) {
        this.posts = posts;
    }

    @Override
    public void run(String[] args) {
        log.info("start data initialization  ...");
        this.posts
          .deleteAll()
          .thenMany(
            Flux
              .just("bianzhaoyu", "xinan")
              .flatMap(
                name -> this.posts.save(Post.builder().name(name).age(25).build())
              )
          )
          .log()
          .subscribe(
            null,
            null,
            () -> log.info("done initialization...")
          );

    }

}

@RestController()
@RequestMapping(value = "/posts")
class PostController {

    private final PostRepository posts;

    public PostController(PostRepository posts) {
        this.posts = posts;
    }

    @GetMapping("")
    public Flux<Post> all() {
        return this.posts.findAll();
    }

    @PostMapping("")
    public Mono<Post> create(@RequestBody Post post) {
        return this.posts.save(post);
    }

    @GetMapping("/{id}")
    public Mono<Post> get(@PathVariable("id") String id) {
        return this.posts.findById(id);
    }

    @PutMapping("/{id}")
    public Mono<Post> update(@PathVariable("id") String id, @RequestBody Post post) {
        return this.posts.findById(id)
          .map(p -> {
              p.setName(post.getName());
              p.setAge(post.getAge());

              return p;
          })
          .flatMap(p -> this.posts.save(p));
    }

    @DeleteMapping("/{id}")
    public Mono<Void> delete(@PathVariable("id") String id) {
        return this.posts.deleteById(id);
    }

}

interface PostRepository extends ReactiveMongoRepository<Post, String> {
}

@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Post {

    @Id
    private String id;
    private String name;
    private Integer age;

    @CreatedDate
    private LocalDateTime createdDate;
}

配置如下:

spring:
  data:
    mongodb:
      uri: mongodb://localhost:27017/blog
      grid-fs-database: images


<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>
</dependencies>

Redis

异步Redis客户端的使用和普通Redis客户端类似,只是RedisTemplate的方法都原生支持了异步调用。使用时只要引入spring-boot-starter-data-redis-reactive依赖,并注册ReactiveRedisTemplate即可:

@Bean
    public ReactiveRedisTemplate<String, Post> reactiveJsonPostRedisTemplate(
      ReactiveRedisConnectionFactory connectionFactory) {
        
        RedisSerializationContext<String, Post> serializationContext = RedisSerializationContext
          .<String, Post>newSerializationContext(new StringRedisSerializer())
          .hashKey(new StringRedisSerializer())
          .hashValue(new Jackson2JsonRedisSerializer<>(Post.class))
          .build();
        return new ReactiveRedisTemplate<>(connectionFactory, serializationContext);
    }

@Component
class PostRepository {

    ReactiveRedisOperations<String, Post> template;

    public PostRepository(ReactiveRedisOperations<String, Post> template) {
        this.template = template;
    }

    Flux<Post> findAll() {
        return template.<String, Post>opsForHash().values("posts");
    }

    Mono<Post> findById(String id) {
        return template.<String, Post>opsForHash().get("posts", id);
    }

    Mono<Post> save(Post post) {
        if (post.getId() != null) {
            String id = UUID.randomUUID().toString();
            post.setId(id);
        }
        return template.<String, Post>opsForHash().put("posts", post.getId(), post)
          .log()
          .map(p -> post);

    }

    Mono<Void> deleteById(String id) {
        return template.<String, Post>opsForHash().remove("posts", id)
          .flatMap(p -> Mono.<Void>empty());
    }

    Mono<Boolean> deleteAll() {
        return template.<String, Post>opsForHash().delete("posts");
    }

}

MySQL

mysql作为现在使用最广的数据存储工具,可以说是选择任何框架时必须考虑到兼容性的一点。但是遗憾的是,由于JDBC协议只支持同步访问,spring目前并没有直接对jdbc的reactor客户端的支持。虽然可以通过引入第三方异步数据库连接池,或者将普通jpa方法用Mono,Flux指定调用线程池的方式进行包装,但是作为关系型数据库最重要的一点:事务,却无法用@Transactional实现。虽然可以将一个事务的代码写在一个异步函数中,但却无法做到像同步方法那样,使用@Transactional各个业务方法,导致可复用性和实用性极低。这里使用一个异步jdbc线程池rxjava2-jdbc,相比与Mono/Flux包装的方式,rxjava2-jdbc在返回一个connection时是异步的,虽然由于jdbc协议的线程,执行sql语句的时候仍然是同步阻塞的。rxjava-jdbc内部维护了一个线程池用于执行阻塞代码,这也避免了我们自定义线程池的麻烦。
pom依赖:

        <dependency>
            <groupId>com.github.davidmoten</groupId>
            <artifactId>rxjava2-jdbc</artifactId>
            <version>0.1-RC23</version>
        </dependency>

代码如下:

/**
 * spring-data-jpa是同步的,repository返回的结果并不是Mono或者Flux形式。
 *     可以使用第三方异步jdbc连接池rxjava2-jdbc,但是由于每个方法是异步的,
 * 当数个异步方法组合起来时,并不能保证每个方法都是由一个线程按顺序调用的,
 * 这就使基于ThreadLocal的@Transactional无法使用
 *     当然,可以手动在一个异步方法中开启并提交事务,但是这还是失去了@Transactional组合
 * 不同方法到一个事物的便利性和可扩展性
 * @author xinan
 */
@Component
public class RxJava2PostRepository {
    private Database db;

    RxJava2PostRepository(Database db) {
        this.db = db;
    }

    public Observable<Post> findAll() {
        return this.db.select("select * from posts")
            .get(
                rs -> new Post(rs.getLong("id"),
                    rs.getString("name"),
                    rs.getInt("age")
                )
            )
            .toObservable();
    }

    public Single<Post> findById(Long id) {
        return this.db.select("select * from posts where id=?")
            .parameter(id)
            .get(
                rs -> new Post(rs.getLong("id"),
                    rs.getString("name"),
                    rs.getInt("age")
                )
            )
            .firstElement()
            .toSingle();
    }

    public Single<Integer> save(Post post) {
        return this.db.update("insert into posts(name, age) values(?, ?)")
            .parameters(post.getName(), post.getAge())
            .returnGeneratedKeys()
            .getAs(Integer.class)
            .firstElement()
            .toSingle();
    }

    String sql = "insert into posts(title, content) values(?, ?)";

    //使用事务
    public Single<Integer> saveTx(Post post) {
        return db.connection()
          .map(connection -> {
              connection.setAutoCommit(false);
              PreparedStatement pstmt = connection.prepareStatement(sql);
              pstmt.setInt(1, post.getAge());
              pstmt.setInt(2, post.getAge());
              int i = pstmt.executeUpdate();
              pstmt.close();
              connection.commit();
              return i;
          });
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335

推荐阅读更多精彩内容