// The method may be on an interface, but we need attributes from the target class. // If the target class is null, the method will be unchanged. Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// First try is the method in the target class. TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; // 如果函数加了注解,这里返回,可以把断点断到这里 }
// Second try is the transaction attribute on the target class. txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; }
if (specificMethod != method) { // Fallback is to look at the original method. txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // Last fallback is the class of the original method. txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } }
if (this.beanFactory instanceof ConfigurableListableBeanFactory clbf) { AutoProxyUtils.exposeTargetClass(clbf, beanName, beanClass); }
ProxyFactory proxyFactory = new ProxyFactory(); proxyFactory.copyFrom(this);
if (proxyFactory.isProxyTargetClass()) { // Explicit handling of JDK proxy targets and lambdas (for introduction advice scenarios) if (Proxy.isProxyClass(beanClass) || ClassUtils.isLambdaClass(beanClass)) { // Must allow for introductions; can't just set interfaces to the proxy's interfaces only. for (Class<?> ifc : beanClass.getInterfaces()) { proxyFactory.addInterface(ifc); } } } else { // No proxyTargetClass flag enforced, let's apply our default checks... if (shouldProxyTargetClass(beanClass, beanName)) { proxyFactory.setProxyTargetClass(true); } else { evaluateProxyInterfaces(beanClass, proxyFactory); } }
try { Class<?> rootClass = this.advised.getTargetClass(); Assert.state(rootClass != null, "Target class must be available for creating a CGLIB proxy");
Callback[] callbacks = getCallbacks(rootClass); Class<?>[] types = new Class<?>[callbacks.length]; for (int x = 0; x < types.length; x++) { types[x] = callbacks[x].getClass(); } // fixedInterceptorMap only populated at this point, after getCallbacks call above ProxyCallbackFilter filter = new ProxyCallbackFilter( this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset); enhancer.setCallbackFilter(filter); enhancer.setCallbackTypes(types);
// Generate the proxy class and create a proxy instance. // ProxyCallbackFilter has method introspection capability with Advisor access. try { return (classOnly ? createProxyClass(enhancer) : createProxyClassAndInstance(enhancer, callbacks)); // 进入这里 } finally { // Reduce ProxyCallbackFilter to key-only state for its class cache role // in the CGLIB$CALLBACK_FILTER field, not leaking any Advisor state... filter.advised.reduceToAdvisorKey(); } } catch (CodeGenerationException | IllegalArgumentException ex) { thrownew AopConfigException("Could not generate CGLIB subclass of " + this.advised.getTargetClass() + ": Common causes of this problem include using a final class or a non-visible class", ex); } catch (Throwable ex) { // TargetSource.getTarget() failed thrownew AopConfigException("Unexpected AOP exception", ex); } }
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager cpptm)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 这里要获取连接开启事务,进去看一下
Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); }
if (retVal != null && txAttr != null) { TransactionStatus status = txInfo.getTransactionStatus(); if (status != null) { if (retVal instanceof Future<?> future && future.isDone()) { try { future.get(); } catch (ExecutionException ex) { if (txAttr.rollbackOn(ex.getCause())) { status.setRollbackOnly(); } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } elseif (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } }
else { Object result; final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { result = cpptm.execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException runtimeException) { throw runtimeException; } else { thrownew ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; returnnull; } } finally { cleanupTransactionInfo(txInfo); } }); } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; }
// Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } }
if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(def, transaction, debugEnabled); }
// Check definition settings for new transaction. if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { thrownew InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); }
// No existing transaction found -> check propagation behavior to find out how to proceed. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { thrownew IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } elseif (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 这些是处理隔离级别 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { return startTransaction(def, transaction, false, debugEnabled, suspendedResources); // 开始事务,这里要进去看一下 } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); // 关闭autoCommit }
int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); }
// Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); // 如果是新建的,就绑定连接 } }
catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } thrownew CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method... if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return false. We created it only // to preserve the integrity of the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) { logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional."); } }
// We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. txInfo.bindToThread(); // 关键这里有一个绑定线程的操作,使用了threadLocal return txInfo; } /** * Holder to support the {@code currentTransactionStatus()} method, * and to support communication between different cooperating advices * (e.g. before and after advice) if the aspect involves more than a * single method (as will be the case for around advice). */ privatestaticfinal ThreadLocal<TransactionInfo> transactionInfoHolder = new NamedThreadLocal<>("Current aspect-driven transaction"); privatevoidbindToThread(){ // Expose current TransactionStatus, preserving any existing TransactionStatus // for restoration after this transaction is complete. this.oldTransactionInfo = transactionInfoHolder.get(); transactionInfoHolder.set(this); } // 我也看了一下这个threadlocal没有remove,只要线程活着,连接就存在
...... if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager cpptm)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 刚才在这里创建了数据库连接,开启了事务
Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); // 这里执行sql,这里就是aop的执行逻辑了,invocation.proceed(),开始执行我们的业务函数 } catch (Throwable ex) { // 业务执行出问题会走到这里,这里会回滚 // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); }
...... if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager cpptm)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 刚才在这里创建了数据库连接,开启了事务
Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); // 这里执行sql,这里就是aop的执行逻辑了,invocation.proceed(),开始执行我们的业务函数 } catch (Throwable ex) { // 业务执行出问题会走到这里,这里会回滚 // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); }
...... commitTransactionAfterReturning(txInfo); // 看一下提交 return retVal; } } protectedvoidcommitTransactionAfterReturning(@Nullable TransactionInfo txInfo){ if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } } publicfinalvoidcommit(TransactionStatus status)throws TransactionException { if (status.isCompleted()) { thrownew IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); }
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { // rollback-only 是必须回滚的情况,想要深入了解可以问下ai具体场景。简单来说,这个东西可以手动设置,也可以由嵌套事务失败触发。 if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; }
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; }
// Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (unexpectedRollback) { thrownew UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } } //...... } protectedvoiddoCommit(DefaultTransactionStatus status){ DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { con.commit(); // 提交事务 } catch (SQLException ex) { throw translateException("JDBC commit", ex); } }