Spring Async支持(源码学习)

下面先来看看@Aysnc注解类的说明

package org.springframework.scheduling.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 标记一个方法或者类支持异步执行
 *
 * 异步执行的方法参数没有限制,返回值限制使用void或java.util.concurent.Future
 * 使用Future,可以声明更具体的实现,例如
 * org.springframework.util.concurrent.ListenableFuture or
 * java.util.concurrent.CompletableFuture
 * 这样可以与异步任务的结果进行更进一步的处理。

 *  代理方法返回的Future 是一个异步对象的Future ,这样的话目标方法就得有相同的方法声明
 *  可以返回一个临时的 Future句柄,例如Spring的AsyncResult 或者EJB3.1的 javax.ejb.AsyncResult
*  或者 java.util.concurrent.CompletableFuture#completedFuture(Object)
 *
 * @author Juergen Hoeller
 * @author Chris Beams
 * @since 3.0
 * @see AnnotationAsyncExecutionInterceptor 具体的增强,拦截后异步执行。
 * @see AsyncAnnotationAdvisor 顾问,方法切面,包含了增强和切入点。
 */
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {

    /**
     * 可以指定一个执行器Executor 或者Spring的TaskExecutor
         * 如果不指定,日志中会提示.s.a.AnnotationAsyncExecutionInterceptor : No TaskExecutor bean found for async processing ,会使用默认执行器为SimpleAsyncTaskExecutor
     */
    String value() default "";

}

注解顾问,也就是Spring增强实现,包含了增强和切入点

package org.springframework.scheduling.annotation;

import java.lang.annotation.Annotation;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;

import org.aopalliance.aop.Advice;

import org.springframework.aop.Pointcut;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.aop.support.AbstractPointcutAdvisor;
import org.springframework.aop.support.ComposablePointcut;
import org.springframework.aop.support.annotation.AnnotationMatchingPointcut;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/**
 * Advisor that activates asynchronous method execution through the {@link Async}
 * annotation. This annotation can be used at the method and type level in
 * implementation classes as well as in service interfaces.
 *
 * <p>This advisor detects the EJB 3.1 {@code javax.ejb.Asynchronous}
 * annotation as well, treating it exactly like Spring's own {@code Async}.
 * Furthermore, a custom async annotation type may get specified through the
 * {@link #setAsyncAnnotationType "asyncAnnotationType"} property.
 *
 * @author Juergen Hoeller
 * @since 3.0
 * @see org.springframework.dao.annotation.PersistenceExceptionTranslationAdvisor
 * @see org.springframework.stereotype.Repository
 * @see org.springframework.dao.DataAccessException
 * @see org.springframework.dao.support.PersistenceExceptionTranslator
 */
@SuppressWarnings("serial")
//相当于一个切面 ,包含了切入点pointcut和增强advice
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
        //异常处理handler,默认实现输出日志 SimpleAsyncUncaughtExceptionHandler
    private AsyncUncaughtExceptionHandler exceptionHandler;
        //通知,查看buildAdvice,使用AnnotationAsyncExecutionInterceptor,用于拦截添加Async注解的方法
        //放到线程池中执行。
    private Advice advice;
        //切入点,查看buildPointcut,返回一个复合切入点ComposablePointcut 包含类和方法的切入点
    private Pointcut pointcut;


    /**
     * Create a new {@code AsyncAnnotationAdvisor} for bean-style configuration.
     */
    public AsyncAnnotationAdvisor() {
        this(null, null);
    }

    /**
     * Create a new {@code AsyncAnnotationAdvisor} for the given task executor.
     * @param executor the task executor to use for asynchronous methods
     * (can be {@code null} to trigger default executor resolution)
     * @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use to
     * handle unexpected exception thrown by asynchronous method executions
     * @see AnnotationAsyncExecutionInterceptor#getDefaultExecutor(BeanFactory)
     */
    @SuppressWarnings("unchecked")
    public AsyncAnnotationAdvisor(Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
        Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(2);
        asyncAnnotationTypes.add(Async.class);
        try {
            asyncAnnotationTypes.add((Class<? extends Annotation>)
                    ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
        }
        catch (ClassNotFoundException ex) {
            // If EJB 3.1 API not present, simply ignore.
        }
        if (exceptionHandler != null) {
            this.exceptionHandler = exceptionHandler;
        }
        else {
            this.exceptionHandler = new SimpleAsyncUncaughtExceptionHandler();
        }
                //构建增强,参数是一个执行器,和异常处理类。
        this.advice = buildAdvice(executor, this.exceptionHandler);
                //构建切入点
        this.pointcut = buildPointcut(asyncAnnotationTypes);
    }


    /**
     * Specify the default task executor to use for asynchronous methods.
     */
    public void setTaskExecutor(Executor executor) {
        this.advice = buildAdvice(executor, this.exceptionHandler);
    }

    /**
     * Set the 'async' annotation type.
     * <p>The default async annotation type is the {@link Async} annotation, as well
     * as the EJB 3.1 {@code javax.ejb.Asynchronous} annotation (if present).
     * <p>This setter property exists so that developers can provide their own
     * (non-Spring-specific) annotation type to indicate that a method is to
     * be executed asynchronously.
     * @param asyncAnnotationType the desired annotation type
     */
    public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
        Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
        Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<Class<? extends Annotation>>();
        asyncAnnotationTypes.add(asyncAnnotationType);
        this.pointcut = buildPointcut(asyncAnnotationTypes);
    }

    /**
     * Set the {@code BeanFactory} to be used when looking up executors by qualifier.
     */
    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        if (this.advice instanceof BeanFactoryAware) {
            ((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
        }
    }


    @Override
    public Advice getAdvice() {
        return this.advice;
    }

    @Override
    public Pointcut getPointcut() {
        return this.pointcut;
    }


    protected Advice buildAdvice(Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
        return new AnnotationAsyncExecutionInterceptor(executor, exceptionHandler);
    }

    /**
     * Calculate a pointcut for the given async annotation types, if any.
     * @param asyncAnnotationTypes the async annotation types to introspect
     * @return the applicable Pointcut object, or {@code null} if none
     */
    protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
        ComposablePointcut result = null;
        for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
            Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
            Pointcut mpc = AnnotationMatchingPointcut.forMethodAnnotation(asyncAnnotationType);
            if (result == null) {
                result = new ComposablePointcut(cpc);
            }
            else {
                result.union(cpc);
            }
            result = result.union(mpc);
        }
        return result;
    }

}

增强拦截器,主要实现功能在父类,本类实现了getExecutorQualifier,也就是获取@Async注解中value的值,看有没有指定的执行器Executor

package org.springframework.scheduling.annotation;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;

import org.springframework.aop.interceptor.AsyncExecutionInterceptor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.core.annotation.AnnotatedElementUtils;

/**
 * Specialization of {@link AsyncExecutionInterceptor} that delegates method execution to
 * an {@code Executor} based on the {@link Async} annotation. Specifically designed to
 * support use of {@link Async#value()} executor qualification mechanism introduced in
 * Spring 3.1.2. Supports detecting qualifier metadata via {@code @Async} at the method or
 * declaring class level. See {@link #getExecutorQualifier(Method)} for details.
 *
 * @author Chris Beams
 * @author Stephane Nicoll
 * @since 3.1.2
 * @see org.springframework.scheduling.annotation.Async
 * @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
 */
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {

    /**
     * Create a new {@code AnnotationAsyncExecutionInterceptor} with the given executor
     * and a simple {@link AsyncUncaughtExceptionHandler}.
     * @param defaultExecutor the executor to be used by default if no more specific
     * executor has been qualified at the method level using {@link Async#value()};
     * as of 4.2.6, a local executor for this interceptor will be built otherwise
     */
    public AnnotationAsyncExecutionInterceptor(Executor defaultExecutor) {
        super(defaultExecutor);
    }

    /**
     * Create a new {@code AnnotationAsyncExecutionInterceptor} with the given executor.
     * @param defaultExecutor the executor to be used by default if no more specific
     * executor has been qualified at the method level using {@link Async#value()};
     * as of 4.2.6, a local executor for this interceptor will be built otherwise
     * @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use to
     * handle exceptions thrown by asynchronous method executions with {@code void}
     * return type
     */
    public AnnotationAsyncExecutionInterceptor(Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
        super(defaultExecutor, exceptionHandler);
    }


    /**
     * Return the qualifier or bean name of the executor to be used when executing the
     * given method, specified via {@link Async#value} at the method or declaring
     * class level. If {@code @Async} is specified at both the method and class level, the
     * method's {@code #value} takes precedence (even if empty string, indicating that
     * the default executor should be used preferentially).
     * @param method the method to inspect for executor qualifier metadata
     * @return the qualifier if specified, otherwise empty string indicating that the
     * {@linkplain #setExecutor(Executor) default executor} should be used
     * @see #determineAsyncExecutor(Method)
     */
    @Override
    protected String getExecutorQualifier(Method method) {
        // Maintainer's note: changes made here should also be made in
        // AnnotationAsyncExecutionAspect#getExecutorQualifier
                //获取方法上的注解
        Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
        if (async == null) {
                        //获取类上的注解
            async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
        }
               //返回@Aysnc注解中指定的value值
        return (async != null ? async.value() : null);
    }

}

增强拦截器的父类,主要看invoke方法,转换拦截的方法为Callable对象。提交到线程执行器中执行


package org.springframework.aop.interceptor;

import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;

import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.core.Ordered;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.util.ClassUtils;

/**
 * AOP Alliance {@code MethodInterceptor} that processes method invocations
 * asynchronously, using a given {@link org.springframework.core.task.AsyncTaskExecutor}.
 * Typically used with the {@link org.springframework.scheduling.annotation.Async} annotation.
 *
 * <p>In terms of target method signatures, any parameter types are supported.
 * However, the return type is constrained to either {@code void} or
 * {@code java.util.concurrent.Future}. In the latter case, the Future handle
 * returned from the proxy will be an actual asynchronous Future that can be used
 * to track the result of the asynchronous method execution. However, since the
 * target method needs to implement the same signature, it will have to return
 * a temporary Future handle that just passes the return value through
 * (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult}
 * or EJB 3.1's {@code javax.ejb.AsyncResult}).
 *
 * <p>When the return type is {@code java.util.concurrent.Future}, any exception thrown
 * during the execution can be accessed and managed by the caller. With {@code void}
 * return type however, such exceptions cannot be transmitted back. In that case an
 * {@link AsyncUncaughtExceptionHandler} can be registered to process such exceptions.
 *
 * <p>As of Spring 3.1.2 the {@code AnnotationAsyncExecutionInterceptor} subclass is
 * preferred for use due to its support for executor qualification in conjunction with
 * Spring's {@code @Async} annotation.
 *
 * @author Juergen Hoeller
 * @author Chris Beams
 * @author Stephane Nicoll
 * @since 3.0
 * @see org.springframework.scheduling.annotation.Async
 * @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
 * @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor
 */
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {

    /**
     * Create a new instance with a default {@link AsyncUncaughtExceptionHandler}.
     * @param defaultExecutor the {@link Executor} (typically a Spring {@link AsyncTaskExecutor}
     * or {@link java.util.concurrent.ExecutorService}) to delegate to;
     * as of 4.2.6, a local executor for this interceptor will be built otherwise
     */
    public AsyncExecutionInterceptor(Executor defaultExecutor) {
        super(defaultExecutor);
    }

    /**
     * Create a new {@code AsyncExecutionInterceptor}.
     * @param defaultExecutor the {@link Executor} (typically a Spring {@link AsyncTaskExecutor}
     * or {@link java.util.concurrent.ExecutorService}) to delegate to;
     * as of 4.2.6, a local executor for this interceptor will be built otherwise
     * @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use
     */
    public AsyncExecutionInterceptor(Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
        super(defaultExecutor, exceptionHandler);
    }


    /**
     * Intercept the given method invocation, submit the actual calling of the method to
     * the correct task executor and return immediately to the caller.
     * @param invocation the method to intercept and make asynchronous
     * @return {@link Future} if the original method returns {@code Future}; {@code null}
     * otherwise.
     */
    @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
               //根据用户声明的@Async中声明的value在bean工厂中查找执行器,如果找不到则抛异常。
        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }
                //转换为异步对象
        Callable<Object> task = new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    Object result = invocation.proceed();
                                        //看返回值是否是Future
                    if (result instanceof Future) {
                        return ((Future<?>) result).get();
                    }
                }
                catch (ExecutionException ex) {
                                        //异常处理,如果是Futrue对象则抛出,其他则调用AsyncUncaughtExceptionHandler的handleUncaughtException,默认实现是打印错误日志。
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            }
        };
                 //提交到线程池执行,在父类中实现
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

    /**
     * This implementation is a no-op for compatibility in Spring 3.1.2.
     * Subclasses may override to provide support for extracting qualifier information,
     * e.g. via an annotation on the given method.
     * @return always {@code null}
     * @since 3.1.2
     * @see #determineAsyncExecutor(Method)
     */
    @Override
    protected String getExecutorQualifier(Method method) {
        return null;
    }

    /**
        * 在Bean工厂查找TaskExecutor的实现,或者名称为taskExecutor的实例,如果没有则创建一个
         * SimpleAsyncTaskExecutor
     * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor}
     * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
     * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all),
     * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance
     * for local use if no default could be found.
     * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
     */
    @Override
    protected Executor getDefaultExecutor(BeanFactory beanFactory) {
        Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
        return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335