在:https://www.jianshu.com/p/c8412579fe1a
不那么详细的分析了spring aop整个运转的流程,本篇文章再来稍微深入下,看看spring是如何通过aop实现Transaction的功能的。
在AbstractAutoProxyCreator中,会找到合适的Advisor从而对bean做增强:
//AbstractAutoProxyCreator
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
return bean;
}
if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
return bean;
}
if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
// Create proxy if we have advice.
// 查找到属于该bean的advisor
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
this.advisedBeans.put(cacheKey, Boolean.TRUE);
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
}
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
//AbstractAdvisorAutoProxyCreator
protected Object[] getAdvicesAndAdvisorsForBean(
Class<?> beanClass, String beanName, @Nullable TargetSource targetSource) {
// 获取合适的Advosor
List<Advisor> advisors = findEligibleAdvisors(beanClass, beanName);
if (advisors.isEmpty()) {
return DO_NOT_PROXY;
}
return advisors.toArray();
}
protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) {
// 获取所有advisors
List<Advisor> candidateAdvisors = findCandidateAdvisors();
// 获取适用于该类的Advisor
List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
extendAdvisors(eligibleAdvisors);
if (!eligibleAdvisors.isEmpty()) {
eligibleAdvisors = sortAdvisors(eligibleAdvisors);
}
return eligibleAdvisors;
}
protected List<Advisor> findCandidateAdvisors() {
Assert.state(this.advisorRetrievalHelper != null, "No BeanFactoryAdvisorRetrievalHelper available");
return this.advisorRetrievalHelper.findAdvisorBeans();
}
//BeanFactoryAdvisorRetrievalHelper
public List<Advisor> findAdvisorBeans() {
// Determine list of advisor bean names, if not cached already.
String[] advisorNames = this.cachedAdvisorBeanNames;
if (advisorNames == null) {
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let the auto-proxy creator apply to them!
// 这个都很熟悉了吧 直接从bean容器中获取所有Advisor的实例
advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
this.beanFactory, Advisor.class, true, false);
this.cachedAdvisorBeanNames = advisorNames;
}
if (advisorNames.length == 0) {
return new ArrayList<>();
}
List<Advisor> advisors = new ArrayList<>();
for (String name : advisorNames) {
if (isEligibleBean(name)) {
if (this.beanFactory.isCurrentlyInCreation(name)) {
if (logger.isTraceEnabled()) {
logger.trace("Skipping currently created advisor '" + name + "'");
}
}
else {
try {
// 通过advisor的名称获取到对应的bean实例
advisors.add(this.beanFactory.getBean(name, Advisor.class));
}
catch (BeanCreationException ex) {
Throwable rootCause = ex.getMostSpecificCause();
if (rootCause instanceof BeanCurrentlyInCreationException) {
BeanCreationException bce = (BeanCreationException) rootCause;
String bceBeanName = bce.getBeanName();
if (bceBeanName != null && this.beanFactory.isCurrentlyInCreation(bceBeanName)) {
if (logger.isTraceEnabled()) {
logger.trace("Skipping advisor '" + name +
"' with dependency on currently created bean: " + ex.getMessage());
}
// Ignore: indicates a reference back to the bean we're trying to advise.
// We want to find advisors other than the currently created bean itself.
continue;
}
}
throw ex;
}
}
}
}
return advisors;
}
大体上获取Advisor就是这么个逻辑,先找出所有对应Advisor的类的beanName,再通过beanFactory.getBean方法获取这些bean并返回。
上面我们知道了在实现AOP的时候会找到所有的Advisor,而Transaction的一个主要的类BeanFactoryTransactionAttributeSourceAdvisor其实就是一个Advisor:
所以 在findCandidateAdvisors方法中,就会找到Transaction的核心类BeanFactoryTransactionAttributeSourceAdvisor,从而实现Transaction逻辑。
而判断是否需要Transaction逻辑其实是在findAdvisorsThatCanApply中实现的,大体的模式就是看bean的方法或者类上是否含有@Transaction,如果有则代表需要Transaction增强,关于findAdvisorsThatCanApply的代码,这里暂时按下不表,主要的逻辑在AbstractFallbackTransactionAttributeSource中的computeTransactionAttribute()方法中,关于整个调用链路可以自行跟踪下:
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow no-public methods as required.
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// The method may be on an interface, but we need attributes from the target class.
// If the target class is null, the method will be unchanged.
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// First try is the method in the target class.
// 方法中是否存在事务声明
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// Second try is the transaction attribute on the target class.
// 查看方法所在类中是否存在事务声明
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
if (specificMethod != method) {
// Fallback is to look at the original method.
// 接口方法中是否存在事务声明
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
// 接口类中是否存在事务声明
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
Transaction如何通过AOP实现的
通过JDK动态代理做的增强,在调用类方法的时候,都会通过InvocationHandler的invoke方法来执行,而JdkDynamicAopProxy这个东西自己就是个InvocationHandler.,在执行代理类的目标方法时,会调用Advisor的getAdvice获取MethodInterceptor并执行其invoke方法,也就是在这个位置:
// ReflectiveMethodInvocation
public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
return dm.interceptor.invoke(this);
}
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
return proceed();
}
}
else {
// It's an interceptor, so we just invoke it: The pointcut will have
// been evaluated statically before this object was constructed.
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}
也就是在最后一个else中的return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);去执行的,不难看出这里通过责任链模式去执行所有MethodInterceptor的invoke方法。
说了这么多,这些跟Transaction有什么关系呢?
在之前提到的BeanFactoryTransactionAttributeSourceAdvisor中,它的getAdvice方法会返回TransactionInterceptor:
不难看出,它是个MethodInterceptor,所以TransactionInterceptor的invoke方法,就是实现事务的重要逻辑了。
TransactionInterceptor
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
来聚焦下invokeWithinTransaction方法:
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
// ...
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 如果需要的话 开启一个事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 执行目标方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 异常处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清空事务信息
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
方法很长,不过主要的逻辑还是比较清晰,可以看出其中比较关键的几个点:
- createTransactionIfNecessary (需要的话)开启事务
- invocation.proceedWithInvocation(); 执行目标方法
- completeTransactionAfterThrowing(txInfo, ex); 异常处理
- cleanupTransactionInfo(txInfo); 清空事务信息
- commitTransactionAfterReturning(txInfo); 提交事务
这些点放在以后分析。
connection
在createTransactionIfNecessary 中会有这么一段逻辑:
status = tm.getTransaction(txAttr);
这里往下其实会走到DataSourceTransactionManager的doGetTransaction方法:
protected Object doGetTransaction() {
DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
在ConnectionHolder 中,保存了一个Connection连接,而ConnectionHolder是通过(ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());获取的:
protected Object doGetTransaction() {
DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
return doGetResource(actualKey);
}
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
可以看出最终是从resources中获取的,而resources其实就是一个ThreadLocal:
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");