你必须懂也可以懂的@Async原理

摘要

最近和朋友,同事交流了一些关于技术人如何成长的话题。为什么聊到这个话题,因为程序员这个职业发展真的很快,2、3年的时间,相同起点的人可能就会被拉开很大差距,所以技术人一定要持续学习,保证一定的成长速度,才能跟上技术的更新和不断拍来的后浪。

成长体系

1.前言

想你在看这篇文章之前有过使用@Async注解进行任务异步处理的经历,在项目开发过程中,针对非主流程、非实时、耗时的任务,往往会进行异步处理,这样既不会影响主流程,还会提高主流程的响应时间。

在使用@Async注解进行异步处理的过程中,相信你也踩过不少的坑,比如:任务并没有异步执行,由于共用线程池导致任务之间相互影响、异步任务出现异常不知道如何处理等等。今天我将带着你去了解它的真面目,以便下次再遇到问题的时候可以游刃有余,不至于慌慌张张、无从下手。

2.探秘之旅

2.1 实现原理

2.1.1 寻找异步注解后置处理器

你应该知道,要想在项目中使用@Async注解来执行异步任务,需要我们手动去开启异步功能,开启的方式就是需要添加@EnableAsync

@SpringBootApplication
@EnableAsync
public class SpringBootAsyncApplication {

public static void main(String[] args) {
SpringApplication.run(SpringBootAsyncApplication.class, args);
}
}
复制代码

既然通过@EnableAsync注解可以开启异步功能,那么该注解就是我们探秘的入口

进入@EnableAsync注解,就会看到另一个熟悉的注解@Import,该注解的功能就是在程序中引入相关功能对应的配置类

@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {}
复制代码

点开AsyncConfigurationSelector,可以看到此次引入的是ProxyAsyncConfiguration配置类

public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
复制代码

进入ProxyAsyncConfiguration配置类

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
复制代码

可以看到ProxyAsyncConfiguration配置类中声明AsyncAnnotationBeanPostProcessor这样一个Bean,从字面意思也可以猜出该Bean应该就是异步处理的主角,接下来就来看看这个主角做了哪些工作

进入AsyncAnnotationBeanPostProcessor中,可以看到该类实现了BeanFactoryAware、BeanPostProcessor这两个与Bean生命周期息息相关的接口,由Bean的生命周期特性可以得知BeanFactoryAware接口的实现方法先于BeanPostProcessor接口的实现方法执行。

2.1.2 BeanFactoryAware实现

2.1.2.1 定义切面

@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);

// 定义切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
复制代码

在setBeanFactory()实现方法中定义了切面对象,看到切面这两个字,相信你的脑海中会立马浮现出与之有关的两个概念:切点、通知

  1. 切点:用来声明切入的目标
  2. 通知:针对切入目标的相应处理

2.1.3 定义切点

Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
复制代码 protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 定义在类上标注@Async、@Asynchronous注解的切点
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
// 定义在方法上标注@Async、@Asynchronous注解的切点
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
复制代码

2.1.4 定义通知

protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 定义通知
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
复制代码

通知就是最终要执行的,也是想当重要的一部分,既然很重要,那就需要我们来看看具体的实现

public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

// 获取异步任务线程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}

// 定义Callable对象
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
...
return null;
};

return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
复制代码 protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 异步任务的返回值类型是CompletableFuture
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
// 异步任务的返回值类型是ListenableFuture
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
// 异步任务的返回值类型是Future
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
// 否则交由线程池来处理,没有返回值
else {
executor.submit(task);
return null;
}
}
复制代码

通知的具体实现如下:

  1. 第一步获取异步任务线程池,用来执行异步任务
  2. 使用Callable包裹目标方法
  3. 执行异步异步任务,根据不同的返回值类型做相应的处理

通过通知可以了解到异步任务最终实现的原理,你可能还有疑问,那就是如何告知通知来执行异步任务呢?

不知道,你是否还记得上文提到的BeanPostProcessor接口,下面就来看看它的具体实现

2.1.3 BeanPostProcessor实现

提到BeanPostProcessor接口,你就应该立刻意识到它的处理方法肯定对Bean做了某些处理,比如生成代理

有了基础的意识后就来看看此处对应的后置处理实现

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 判断当前Bean是否满足之前定义的切点,如果满足则生成代理对象
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}

// No proxy needed.
return bean;
}
复制代码

通过BeanPostProcessor的后置处理对满足切点的Bean生成代理,在调用目标方法的时候,会执行通知的invoke()方法

到此,异步实现原理部分就结束了,其实原理很简单。我们需要做的就是定义切点、通知;要想实现对目标方法的增强,自然而然想到的就是反向代理;最后就是如何对原有的Bean进行改变呢?此刻就需要联系到与Bean生命周期相关的BeanPostProcessor接口

2.2 线程池使用

线程池这部分还是相当重要的,使用不当可能会导致意向不到的问题发生,比如内存溢出、无限制创建线程、业务之间相互影响等等

* <p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. 复制代码

根据官方文档的说明,可以得知Spring会从上下文中获取唯一的TaskExecutor或者名称"taskExecutor"的Bean作为线程池,默认的线程池在TaskExecutionAutoConfiguration自动配置类中定义,默认的线程池相关配置如下

可以看到默认线程池的队列大小和最大线程数都是Integer的最大值,显然会给系统留下一定的风险隐患,因此我们需要针对每个异步任务自定义线程池,然后在@Async()注解上指明对应线程池的Bean名称

2.3 异常处理

异步任务的异常处理默认情况下只会打印日志信息,不会做任何额外的额处理,官方文档也有相关的说明

Besides, annotated methods having a * {@code void} return type cannot transmit any exception back to the caller. By default, * such uncaught exceptions are only logged. 复制代码

SimpleAsyncUncaughtExceptionHandler就是异步任务异常处理的默认实现,如果想要自定义异常处理,只需要AsyncConfigurer接口即可

2.4 返回值类型

关于返回值类型,首先来看下官方的相关说明

* <p>In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@link java.util.concurrent.Future}. In the latter case, you may declare the * more specific {@link org.springframework.util.concurrent.ListenableFuture} or * {@link java.util.concurrent.CompletableFuture} types which allow for richer * interaction with the asynchronous task and for immediate composition with * further processing steps. 复制代码

从官方说明上可以看出返回值类型只支持4种类型:

  1. void
  2. Future
  3. ListenableFuture
  4. CompletableFuture

在来看看具体的源码

不管从官方说明还是源码分析都可以得出异步任务只支持4种返回类型的结论,以后就不用再问别人String返回类型为什么返回的是null这样的傻瓜问题了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343