一、异步请求
1.1 同步请求与异步请求
首先看一下同步请求的线程执行模型:
接着看一下异步请求的线程执行模型:
异步请求可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放了容器所分配线程的请求,其响应将被延后,可以在耗时处理完成(例如长时间的运算)时再对客户端进行响应。
用户感受上的差别:
- 异步请求是会一直等待response相应的,直到等到到返回结果再返回给客户
- 异步调用我们往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心
两者的使用场景不同,
- 异步请求用来解决并发请求对服务器造成的压力,从而提高对请求的吞吐量;
- 异步调用是用来做一些非主线流程且不需要实时计算和响应的任务,比如同步日志到kafka中做日志分析等。
1.2 异步请求的实现方案
1.2.1 Servlet原生异步请求的实现
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* @Description Servlet方式实现异步请求
* @Date 2020/3/20 12:49
**/
@RestController
@RequestMapping("/servlet")
@Slf4j
public class AsyncServletController {
@GetMapping("/async/request")
public void asyncRequest(HttpServletRequest request, HttpServletResponse response) {
AsyncContext asyncContext = request.startAsync();
//设置监听器:可设置其开始、完成、异常、超时等事件的回调处理
asyncContext.addListener(new AsyncListener() {
@Override
public void onTimeout(AsyncEvent event) throws IOException {
log.error("超时了...");
//做一些超时后的相关操作...
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
log.info("线程开始");
}
@Override
public void onError(AsyncEvent event) throws IOException {
log.error("发生错误:" + event.getThrowable());
}
@Override
public void onComplete(AsyncEvent event) throws IOException {
//这里可以做一些清理资源的操作...
log.info("执行完成");
}
});
//设置超时时间
asyncContext.setTimeout(5000);
asyncContext.start(() -> {
try {
Thread.sleep(1000);
log.info("内部线程:" + Thread.currentThread().getName());
asyncContext.getResponse().setCharacterEncoding("utf-8");
asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
asyncContext.getResponse().getWriter().println("这是异步的请求返回");
} catch (Exception e) {
log.error("异常:" + e);
}
//异步请求完成通知
//此时整个请求才完成
asyncContext.complete();
});
//此时之类 request的线程连接已经释放了
log.info("主线程:" + Thread.currentThread().getName());
}
}
通过观察断点,发现当前的代码,onComplete
是可以被正常回调执行的;如果将线程模拟执行的时候设置为10000ms,那么超时时间设置的是5000ms,那么将会发生执行超时,onTimeout
将会被回调执行。但是无论如何onStartAsync
都不会被执行到,这个是令我务必困惑的地方,有了结个中原因的读者可以给点提示。我目前使用的servelet版本是Servlet 3.0。
1.2.2 基于WebAsyncTask实现异步请求
在Callable外包一层,给WebAsyncTask设置一个超时回调,即可实现超时处理。
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.WebAsyncTask;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* @Description
* @Author louxiujun
* @Date 2020/3/20 13:06
**/
@RestController
@RequestMapping("/web")
@Slf4j
public class AsyncWebAsyncTaskController {
@GetMapping("/test")
@ResponseBody
public WebAsyncTask<String> webAsyncReq() {
log.debug("外部线程:" + Thread.currentThread().getName());
Callable<String> result = () -> {
log.debug("内部线程开始:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
log.debug("副线程返回");
log.debug("内部线程返回:" + Thread.currentThread().getName());
return "success";
};
WebAsyncTask<String> wat = new WebAsyncTask<String>(3000L, result);
wat.onTimeout(() -> {
return "failed";
});
return wat;
}
}
1.2.3 DeferredResult实现异步请求
DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description
* @Date 2020/3/20 13:10
**/
@RestController
@RequestMapping("/deferred")
@Slf4j
public class AsyncDeferredResultController {
/**
* 单线程的线程池
*/
ExecutorService executorService = Executors.newSingleThreadExecutor();
@GetMapping("/test")
@ResponseBody
public DeferredResult<String> deferredResultReq() {
log.info("外部线程:" + Thread.currentThread().getName());
//设置超时时间
DeferredResult<String> result = new DeferredResult<String>(60 * 1000L);
//处理超时事件 采用委托机制
result.onTimeout(() -> {
log.error("DeferredResult超时");
result.setResult("超时了!");
});
result.onCompletion(() -> {
//完成后
log.info("调用完成");
});
executorService.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
//处理业务逻辑
log.info("内部线程:" + Thread.currentThread().getName());
//返回结果
result.setResult("DeferredResult!!");
});
return result;
}
}
1.2.4 本章小结
异步请求的优势在于增加了服务器对客户端请求的吞吐量。实际生产上我们用的比较少,实际生产环境下通过会选择负载均衡器+扩容机器来实现请求的均衡。
二、异步调用
通常在开发过程中,会遇到一个方法是和实际业务无关的,没有紧密性的。比如记录日志信息等业务。这个时候正常就是启一个新线程去做一些业务处理,让主线程异步的执行其他业务。
2.1 使用方式
- 需要在启动类加入@EnableAsync使异步调用@Async注解生效
- 在需要异步执行的方法上加入此注解即可@Async("threadPool"),threadPool为自定义线程池
特别说明:
- 在默认情况下,未设置TaskExecutor时,默认是使用SimpleAsyncTaskExecutor这个线程池,但此线程不是真正意义上的线程池,因为线程不重用,每次调用都会创建一个新的线程。
- 调用的异步方法,不能为同一个类的方法(包括同一个类的内部类)。
2.2 Demo
2.2.1 代码线程池
编写线程池配置类TaskExecutorConfig,其中@EnableAsync
注解用于启用异步响应功能,@Configuration
表明这是一个Bean的配置类,实现AsyncConfigurer
接口来完成异步线程池相关的配置,在getAsyncExecutor
方法中可以设置核心线程数、最大线程数等配置参数、线程工厂等参数。
此外,这里还自定义了一个线程工厂类MyThreadFactory
用于指定新创建出来的线程的名称。
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description
* @Author louxiujun
* @Date 2020/3/20 14:24
**/
@EnableAsync
@Configuration
@Slf4j
public class TaskExecutorConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//线程池大小
taskExecutor.setCorePoolSize(5);
//线程池最大线程数
taskExecutor.setMaxPoolSize(10);
//最大等待任务数
taskExecutor.setQueueCapacity(25);
// 设置自定义的线程池名称
taskExecutor.setThreadFactory(new MyThreadFactory());
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
static class MyThreadFactory implements ThreadFactory {
private AtomicInteger atomicInteger = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
int index = atomicInteger.incrementAndGet();
log.debug("create no " + index + " thread");
Thread t = new Thread(r, "AsyncThread-" + index);
return t;
}
}
}
我们在创建一个service,用于模拟一个耗时操作,具体的执行方法上需要加上@Async
注解用以告诉Spring容器这是一个异步方法,在实际执行的时候,需要将其抛到自定义的线程池中去执行:
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @Description
* @Date 2020/3/20 14:29
**/
@Service("asyncTaskService")
@Slf4j
public class AsyncTaskService {
@Async
public void excuteAsyncTaskTest(String name) {
for (int i = 0; i < 5; i++) {
log.info(Thread.currentThread().getName() + "正在执行异步任务" + name + i);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最后,我们编写一个async相应的测试类AsyncController
:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description
* @Date 2020/3/20 14:11
**/
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController {
@Autowired
private AsyncTaskService asyncTaskService;
@GetMapping("/test")
@ResponseBody
public String test() {
// 异步处理的方法
asyncTaskService.excuteAsyncTaskTest("my test");
return "test";
}
}
我们针对http://localhost:8080/async/test
连续发出三次请求,页面均直接返回“test”的请求结果。
但是在后台控制台上,我们通过打印的日志信息,看到了线程池接连启用了三个线程池中的线程来执行实际的“耗时”请求。
2.2.2 xml配置的线程池
除了可以使用@Configuration
的配置类的方式,我们还可以选择使用xml配置文件的方式来设置线程池相关的配置参数。
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd"
default-autowire="byName">
<!-- 定时器的线程池数量大小 -->
<task:scheduler id="scheduler" pool-size="5"/>
<!-- 任务线程池的数量大小,core size为5,max size为15,队列容量为5,达到总线程数时抛出异常、不执行 -->
<task:executor id="executor" pool-size="5-15" queue-capacity="1000"/>
<!-- 支持异步方法执行,作用等同于@EnableAsync注解.设置定时任务注解和executor任务 -->
<task:annotation-driven executor="executor" scheduler="scheduler"/>
</beans>
在应用启动类Application上通过@importResource注解引用该xml文件:
@SpringBootApplication
@ImportResource({"classpath:bean-config.xml"})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
将应用启动后,同样在页面连续发出三次请求,页面都是立即返回响应,但是后台的线程依然在执行,执行效果的截图如下:
2.3 本章小结
本小结介绍了关于异步响应的代码和xml配置的两种实现方式,可以根据大家通常的喜好来选择使用。
异步调用我们往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心。因此,我们可以将一些耗时的操作使用@Async进行异步化,提升用户使用体验,但是与此同时所带来的的副作用也要考虑到:由于需要使用到线程池,会增加核心系统线程资源的开销,在已经存在大量多线程的情况下,再额外的增加使用线程池也未必是最好的解决方案。因此,在没有分布式系统的情况下,@Async
确实能够较好的提升用户体验,实现耗时、复杂操作的异步化,但是对于大型分布式系统而言,使用消息队列来解决这一问题通常是更为普遍的做法,而且消息队列持久化的机制对于异常失败重试也能够提供更好的支持。因此,希望读者在使用异步调用的时候需要谨慎一些。
三、总结
本文主要介绍了Java中的异步请求和异步响应的含义、各自的实现方式、优缺点、试用的场景等。