@Async 是通过Spring AOP代理实现的,应避免在同一类下直接调用,可使用AspectJ 代理调用同类方法。
启动异步调用
@EnableAsync
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
案列
一、无返回值
@Async
public void test1() throws InterruptedException{
Thread.sleep(1000);
System.out.println("----------方法1执行结束---------");
}
二、有返回值
@Async
public Future<String> test2() throws InterruptedException{
Thread.sleep(1000);
System.out.println("----------方法2执行结束---------");
return new AsyncResult<String>("请求成功!");
}
Future方法介绍
public interface Future<V> {
//可通过条件来取消任务
boolean cancel(boolean mayInterruptIfRunning);
//任务是否被取消
boolean isCancelled();
//任务是否执行完成
boolean isDone();
//获取执行结果(阻塞)
V get() throws InterruptedException, ExecutionException;
//规定时间内等待获取执行结果
V get(long timeout, TimeUnit unit);
}
推荐使用Java 8 引入的CompletableFuture
三、测试异步调用
@ActiveProfiles("debug")
@SpringBootTest(classes = Application.class)
public class AsyncTests {
@Resource
private AsyncService asyncService;
@Test
public void asyncTest() throws InterruptedException {
System.err.println("--------方法开始执行--------");
long a = System.currentTimeMillis();
asyncService.test1();
asyncService.test2();
long b = System.currentTimeMillis();
System.err.println("--------方法总耗时为:" + (end - start) + "毫秒");
// 主线程不要立刻结束,否则线程会立刻关闭,抛出sleep interrupted异常
Thread.sleep(3000);
}
}
定制化配置
默认情况下,@Async
使用 SimpleAsyncTaskExecutor
(不推荐),它每次调用都会创建新的线程,非常消耗资源,应使用线程池提来高程序性能。
@Slf4j
@EnableAsync
@Configuration
public class SpringAsyncConfig implements AsyncConfigurer {
/**
* 使用线程池提高程序性能
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 线程池的线程名的前缀。默认为 task-
threadPoolTaskExecutor.setThreadNamePrefix("default-async-task-");
// 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
threadPoolTaskExecutor.setCorePoolSize(15);
// 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
threadPoolTaskExecutor.setMaxPoolSize(30);
// 缓冲队列容量,用来缓冲执行任务的队列容量。默认为 Integer.MAX_VALUE 。
threadPoolTaskExecutor.setQueueCapacity(60);
// 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
threadPoolTaskExecutor.setKeepAliveSeconds(60);
// 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
// 拒绝策略--当线程池里的线程达到最大的线程数时如何处理新任务,默认为AbortPolicy。
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
/***
* 统一处理未捕获的异常
*
* 只会处理无返回值(void)的异步方法,不会处理有返回值(Future)的异步方法
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (Throwable ex, Method method, Object... params) -> {
log.error("异步执行任务失败,错误信息为:{},执行方法为:{},方法参数为:{}",ex.getMessage(), method.getName(), Arrays.toString(params)
);
};
}
}
配置拒绝策略
线程池RejectedExecutionHandler
属性的四种策略为:
AbortPolicy策略:默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
DiscardPolicy策略:如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
DiscardOldestPolicy策略:如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。
CallerRunsPolicy策略:如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。
优雅关停线程池
程序关闭的时候异步任务还在执行,会导致数据库连接池 这类对象被销毁,此时异步任务对数据库的操作就会被中断,导致入库失败。
// 主线程关闭时最大等待子线程执行的时间。
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
// 主线程关闭时是否等待子线程执行完毕。默认为 false 。
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
在配置文件中新建一个执行器
@Slf4j
@EnableAsync
@Configuration
public class SpringAsyncConfig implements AsyncConfigurer {
/**
* 新建一个执行器
*/
@Bean("databaseAsyncTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("database-async-task-");
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(20);
threadPoolTaskExecutor.setQueueCapacity(200);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
// 主线程关闭时最大等待子线程执行的时间。
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
// 主线程关闭时是否等待子线程执行完毕。默认为 false 。
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
指定@Async
执行器
@Async("databaseAsyncTaskExecutor")