基于Spring框架实现异步请求与异步调用

一、异步请求

1.1 同步请求与异步请求

首先看一下同步请求的线程执行模型:


image.png

接着看一下异步请求的线程执行模型:


image.png

异步请求可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放了容器所分配线程的请求,其响应将被延后,可以在耗时处理完成(例如长时间的运算)时再对客户端进行响应。

用户感受上的差别:

  • 异步请求是会一直等待response相应的,直到等到到返回结果再返回给客户
  • 异步调用我们往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心

两者的使用场景不同,

  • 异步请求用来解决并发请求对服务器造成的压力,从而提高对请求的吞吐量;
  • 异步调用是用来做一些非主线流程且不需要实时计算和响应的任务,比如同步日志到kafka中做日志分析等。

1.2 异步请求的实现方案

1.2.1 Servlet原生异步请求的实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

/**
 * @Description Servlet方式实现异步请求
 * @Date 2020/3/20 12:49
 **/
@RestController
@RequestMapping("/servlet")
@Slf4j
public class AsyncServletController {

    @GetMapping("/async/request")
    public void asyncRequest(HttpServletRequest request, HttpServletResponse response) {
        AsyncContext asyncContext = request.startAsync();
        //设置监听器:可设置其开始、完成、异常、超时等事件的回调处理
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                log.error("超时了...");
                //做一些超时后的相关操作...
            }

            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                log.info("线程开始");
            }

            @Override
            public void onError(AsyncEvent event) throws IOException {
                log.error("发生错误:" + event.getThrowable());
            }

            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                //这里可以做一些清理资源的操作...
                log.info("执行完成");
            }
        });

        //设置超时时间
        asyncContext.setTimeout(5000);
        asyncContext.start(() -> {
            try {
                Thread.sleep(1000);
                log.info("内部线程:" + Thread.currentThread().getName());
                asyncContext.getResponse().setCharacterEncoding("utf-8");
                asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
                asyncContext.getResponse().getWriter().println("这是异步的请求返回");
            } catch (Exception e) {
                log.error("异常:" + e);
            }
            //异步请求完成通知
            //此时整个请求才完成
            asyncContext.complete();
        });
        //此时之类 request的线程连接已经释放了
        log.info("主线程:" + Thread.currentThread().getName());
    }
}

通过观察断点,发现当前的代码,onComplete是可以被正常回调执行的;如果将线程模拟执行的时候设置为10000ms,那么超时时间设置的是5000ms,那么将会发生执行超时,onTimeout将会被回调执行。但是无论如何onStartAsync都不会被执行到,这个是令我务必困惑的地方,有了结个中原因的读者可以给点提示。我目前使用的servelet版本是Servlet 3.0。

1.2.2 基于WebAsyncTask实现异步请求

在Callable外包一层,给WebAsyncTask设置一个超时回调,即可实现超时处理。

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.WebAsyncTask;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @Description
 * @Author louxiujun
 * @Date 2020/3/20 13:06
 **/
@RestController
@RequestMapping("/web")
@Slf4j
public class AsyncWebAsyncTaskController {

    @GetMapping("/test")
    @ResponseBody
    public WebAsyncTask<String> webAsyncReq() {

        log.debug("外部线程:" + Thread.currentThread().getName());
        Callable<String> result = () -> {
            log.debug("内部线程开始:" + Thread.currentThread().getName());

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }

            log.debug("副线程返回");
            log.debug("内部线程返回:" + Thread.currentThread().getName());

            return "success";
        };

        WebAsyncTask<String> wat = new WebAsyncTask<String>(3000L, result);
        wat.onTimeout(() -> {
            return "failed";
        });
        return wat;
    }
}

1.2.3 DeferredResult实现异步请求

DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Description 
 * @Date 2020/3/20 13:10
 **/
@RestController
@RequestMapping("/deferred")
@Slf4j
public class AsyncDeferredResultController {

    /**
     * 单线程的线程池
     */
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    @GetMapping("/test")
    @ResponseBody
    public DeferredResult<String> deferredResultReq() {
        log.info("外部线程:" + Thread.currentThread().getName());

        //设置超时时间
        DeferredResult<String> result = new DeferredResult<String>(60 * 1000L);

        //处理超时事件 采用委托机制
        result.onTimeout(() -> {
            log.error("DeferredResult超时");
            result.setResult("超时了!");
        });

        result.onCompletion(() -> {
            //完成后
            log.info("调用完成");
        });

        executorService.execute(() -> {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                log.error(e.getMessage(), e);
            }
            //处理业务逻辑
            log.info("内部线程:" + Thread.currentThread().getName());

            //返回结果
            result.setResult("DeferredResult!!");
        });

        return result;
    }
}

1.2.4 本章小结

异步请求的优势在于增加了服务器对客户端请求的吞吐量。实际生产上我们用的比较少,实际生产环境下通过会选择负载均衡器+扩容机器来实现请求的均衡。

二、异步调用

通常在开发过程中,会遇到一个方法是和实际业务无关的,没有紧密性的。比如记录日志信息等业务。这个时候正常就是启一个新线程去做一些业务处理,让主线程异步的执行其他业务。

2.1 使用方式

  • 需要在启动类加入@EnableAsync使异步调用@Async注解生效
  • 在需要异步执行的方法上加入此注解即可@Async("threadPool"),threadPool为自定义线程池

特别说明:

  • 在默认情况下,未设置TaskExecutor时,默认是使用SimpleAsyncTaskExecutor这个线程池,但此线程不是真正意义上的线程池,因为线程不重用,每次调用都会创建一个新的线程。
  • 调用的异步方法,不能为同一个类的方法(包括同一个类的内部类)。

2.2 Demo

2.2.1 代码线程池

编写线程池配置类TaskExecutorConfig,其中@EnableAsync注解用于启用异步响应功能,@Configuration表明这是一个Bean的配置类,实现AsyncConfigurer接口来完成异步线程池相关的配置,在getAsyncExecutor方法中可以设置核心线程数、最大线程数等配置参数、线程工厂等参数。

此外,这里还自定义了一个线程工厂类MyThreadFactory用于指定新创建出来的线程的名称。

import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Description
 * @Author louxiujun
 * @Date 2020/3/20 14:24
 **/
@EnableAsync
@Configuration
@Slf4j
public class TaskExecutorConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //线程池大小
        taskExecutor.setCorePoolSize(5);
        //线程池最大线程数
        taskExecutor.setMaxPoolSize(10);
        //最大等待任务数
        taskExecutor.setQueueCapacity(25);
        // 设置自定义的线程池名称
        taskExecutor.setThreadFactory(new MyThreadFactory());
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }

    static class MyThreadFactory implements ThreadFactory {
        private AtomicInteger atomicInteger = new AtomicInteger();

        @Override
        public Thread newThread(Runnable r) {
            int index = atomicInteger.incrementAndGet();
            log.debug("create no " + index + " thread");

            Thread t = new Thread(r, "AsyncThread-" + index);
            return t;
        }
    }
}

我们在创建一个service,用于模拟一个耗时操作,具体的执行方法上需要加上@Async注解用以告诉Spring容器这是一个异步方法,在实际执行的时候,需要将其抛到自定义的线程池中去执行:

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @Description
 * @Date 2020/3/20 14:29
 **/
@Service("asyncTaskService")
@Slf4j
public class AsyncTaskService {

    @Async
    public void excuteAsyncTaskTest(String name) {
        for (int i = 0; i < 5; i++) {
            log.info(Thread.currentThread().getName() + "正在执行异步任务" + name + i);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

最后,我们编写一个async相应的测试类AsyncController

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;


/**
 * @Description
 * @Date 2020/3/20 14:11
 **/
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController {

    @Autowired
    private AsyncTaskService asyncTaskService;

    @GetMapping("/test")
    @ResponseBody
    public String test() {
        // 异步处理的方法
        asyncTaskService.excuteAsyncTaskTest("my test");
        return "test";
    }
}

我们针对http://localhost:8080/async/test连续发出三次请求,页面均直接返回“test”的请求结果。

image.png

但是在后台控制台上,我们通过打印的日志信息,看到了线程池接连启用了三个线程池中的线程来执行实际的“耗时”请求。


image.png

2.2.2 xml配置的线程池

除了可以使用@Configuration的配置类的方式,我们还可以选择使用xml配置文件的方式来设置线程池相关的配置参数。

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
       http://www.springframework.org/schema/task
       http://www.springframework.org/schema/task/spring-task.xsd"
       default-autowire="byName">

    <!-- 定时器的线程池数量大小 -->
    <task:scheduler id="scheduler" pool-size="5"/>

    <!-- 任务线程池的数量大小,core size为5,max size为15,队列容量为5,达到总线程数时抛出异常、不执行 -->
    <task:executor id="executor" pool-size="5-15" queue-capacity="1000"/>

    <!-- 支持异步方法执行,作用等同于@EnableAsync注解.设置定时任务注解和executor任务 -->
    <task:annotation-driven executor="executor" scheduler="scheduler"/>
</beans>

在应用启动类Application上通过@importResource注解引用该xml文件:

@SpringBootApplication
@ImportResource({"classpath:bean-config.xml"})
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

将应用启动后,同样在页面连续发出三次请求,页面都是立即返回响应,但是后台的线程依然在执行,执行效果的截图如下:

image.png

2.3 本章小结

本小结介绍了关于异步响应的代码和xml配置的两种实现方式,可以根据大家通常的喜好来选择使用。

异步调用我们往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心。因此,我们可以将一些耗时的操作使用@Async进行异步化,提升用户使用体验,但是与此同时所带来的的副作用也要考虑到:由于需要使用到线程池,会增加核心系统线程资源的开销,在已经存在大量多线程的情况下,再额外的增加使用线程池也未必是最好的解决方案。因此,在没有分布式系统的情况下,@Async确实能够较好的提升用户体验,实现耗时、复杂操作的异步化,但是对于大型分布式系统而言,使用消息队列来解决这一问题通常是更为普遍的做法,而且消息队列持久化的机制对于异常失败重试也能够提供更好的支持。因此,希望读者在使用异步调用的时候需要谨慎一些。

三、总结

本文主要介绍了Java中的异步请求和异步响应的含义、各自的实现方式、优缺点、试用的场景等。

四、参考资料

  1. SpringBoot中异步请求和异步调用(看这一篇就够了)
  2. 使用Spring的@Async解决界面耗时操作,增强用户的体验度
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • __block和__weak修饰符的区别其实是挺明显的:1.__block不管是ARC还是MRC模式下都可以使用,...
    LZM轮回阅读 3,278评论 0 6
  • 我可以为你提起笔, 如同为春天开的花为秋天下的雨。 为了你把美丽凝聚, 为了你把灵魂下坠。 但有一天春日不复, 有...
    墨之白兮阅读 327评论 0 5
  • 昨天情绪低落到崩溃,学业的节点,人生的节点,双重的迷茫积聚在一起,无力承担与解决的乏力感,充斥着自己的情绪。 经过...
    e8e976845a84阅读 333评论 4 2
  • 李云 焦点解决网络初级第19期坚持分享第五天,星期一 1.父母的改变从何开始?从关注解决问题开始,找到当下可以做到...
    构建幸福阅读 116评论 0 0
  • 总要离别 我知道这一刻总会到来 各自有各自的选择 今天我只是短暂离开一段时间了 感谢瑾瑾和女神送我到火车站 感谢大...
    口口h阅读 225评论 0 0