欢迎访问shiker.tech

请允许在我们的网站上展示广告

您似乎使用了广告拦截器,请关闭广告拦截器。我们的网站依靠广告获取资金。

一文读懂Spring事务源码
(last modified Sep 1, 2024, 8:22 PM )
by
侧边栏壁纸
  • 累计撰写 181 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

一文读懂Spring事务源码

橙序员
2024-09-01 / 0 评论 / 0 点赞 / 78 阅读 / 4,682 字 / 正在检测百度是否收录... 正在检测必应是否收录...
文章摘要(AI生成)

Spring事务源码详解主要介绍了在不使用事务注解时,基于jdbc使用事务的步骤,以及通过注解封装事务的方法。同时,还提到了用户线程池和数据库连接池之间可能遇到的多个事务和事务嵌套的情况,以及Spring定义的事务传播机制来解决这些问题。文章还介绍了Spring事务切面的创建过程,包括创建事务属性解析器和获取事务管理器,用户需要根据不同的数据源自己创建事务管理器。最后,还介绍了创建事务拦截器对事务注解方法进行拦截的操作。文章内容详实,对Spring事务的实现机制有很好的解释和说明。


首先,在开始之前我们先想一件事情,如果让我们来为代码封装事务注解,我们需要如果做。在不使用事务注解的情况下,我们想要基于jdbc使用事务,那么需要做以下步骤:

  1. 通过JDBC创建数据库连接池
  2. 在执行事务操作的方法中首先获取数据库连接
  3. 将数据库连接自动提交设为关闭
  4. 执行我们的CRUD操作,并对其进行try catch
  5. 如果有异常,则进行rollback;否则进行commit

那么通过注解封装事务,则是要将除了我们CRUD之外的其他操作封装到切面中,简化用户的使用成本,而且在实际使用过程中,我们需要考虑用户线程池和数据库连接池之间的配置管理,即用户线程获取事务注解可能会遇到以下几种情况:

  1. 多个事务:当前用户线程中已有一个数据库连接在执行事务操作
  2. 事务嵌套:用户在使用过程中对嵌套方法同时使用了事务注解

针对上述可能会出现的情况,spring又定义了事务的传播机制来提供不同的处理策略解决以上问题。下面我们就来看下spring事务是如何实现的吧.

创建事务切面

事务的实现机制与其他spring组件相同,即配置源获取和具体执行。配置源获取需要对应的事务注解的解析器进行,具体执行则要通过事务管理器进行执行,所以创建事务切面包括了两个操作:

  1. 创建事务属性解析器:解析方法上的事务注解,获取对应的事务属性

  2. 获取事务管理器:管理事务生命周期,包括开启、挂起、恢复、提交和回滚

    事务管理器的创建是用户根据不同的数据源自己创建的,例如:

      @Bean
      @Override
      public PlatformTransactionManager annotationDrivenTransactionManager() {
          return new DataSourceTransactionManager(dataSource());
      }
    
  3. 创建事务拦截器:对事务注解的方法进行拦截,在事务中执行该方法

对应源码为:

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

    @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
          TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
        //创建事务切面
       BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
       advisor.setTransactionAttributeSource(transactionAttributeSource);
       advisor.setAdvice(transactionInterceptor);
       if (this.enableTx != null) {
          advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
       }
       return advisor;
    }

    //事务属性源
    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionAttributeSource transactionAttributeSource() {
       //AnnotationTransactionAttributeSource父类AbstractFallbackTransactionAttributeSource
       return new AnnotationTransactionAttributeSource();
    }

    //事务拦截器
    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
       TransactionInterceptor interceptor = new TransactionInterceptor();
       interceptor.setTransactionAttributeSource(transactionAttributeSource);
       //设置事务管理器
       if (this.txManager != null) {
          interceptor.setTransactionManager(this.txManager);
       }
       return interceptor;
    }

}

拦截事务方法

事务拦截方法具体逻辑在事务拦截器TransactionInterceptor的父类TransactionAspectSupport中实现,方法invokeWithinTransaction具体逻辑如下:

image-20240901182157787

事务处理逻辑

这里我们响应式编程和websphere中的事务拦截先排除,在传统应用开发中,拦截事务的具体逻辑为:

//当前线程的事务信息
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
       new NamedThreadLocal<>("Current aspect-driven transaction");

@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
       final InvocationCallback invocation) throws Throwable {

    // 事务属性为空,那么方法就是非事务
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 获取事务属性
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 获取事务管理器
    final TransactionManager tm = determineTransactionManager(txAttr);
    
    //ReactiveTransactionManager:则是为响应式编程模型设计的,适用于非阻塞的异步环境。它支持响应式流,能够在高并发情况下有效管理事务,适合处理如 R2DBC 等响应式数据访问技术。
    //响应式事务处理
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
        ... ...
    }
    
    //PlatformTransactionManager:主要用于传统的命令式编程模型,适用于基于线程的环境。它负责管理事务的开始、提交和回滚,通常用于处理阻塞的 I/O 操作,如 JDBC 和 JPA。
    //非响应式事务
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    //如果事务属性为空或着事务非回调性事务
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
       // Standard transaction demarcation with getTransaction and commit/rollback calls.
       TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

       Object retVal;
       try {
          // This is an around advice: Invoke the next interceptor in the chain.
          // This will normally result in a target object being invoked.
          retVal = invocation.proceedWithInvocation();
       }
       catch (Throwable ex) {
          // target invocation exception
          completeTransactionAfterThrowing(txInfo, ex);
          throw ex;
       }
       finally {
          cleanupTransactionInfo(txInfo);
       }

       if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
          // Set rollback-only in case of Vavr failure matching our rollback rules...
          TransactionStatus status = txInfo.getTransactionStatus();
          if (status != null && txAttr != null) {
             retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
          }
       }

       commitTransactionAfterReturning(txInfo);
       return retVal;
    }
    
    else {
        ... ...
    }
}    
 

解析事务注解

在获取事务属性时,我们通过以下调用链可以找到具体的注解解析调用过程:

AbstractFallbackTransactionAttributeSource.getTransactionAttribute
|--->AbstractFallbackTransactionAttributeSource.computeTransactionAttribute
     |--->AnnotationTransactionAttributeSource.findTransactionAttribute
     	  |--->SpringTransactionAnnotationParser.parseTransactionAnnotation

而在SpringTransactionAnnotationParser中,可以看到具体解析了哪些属性:

protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
    //事务属性
    RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
    //事务传播机制
    Propagation propagation = attributes.getEnum("propagation");
    rbta.setPropagationBehavior(propagation.value());
    //事务隔离级别
    Isolation isolation = attributes.getEnum("isolation");
    rbta.setIsolationLevel(isolation.value());
    //事务超时事件
    rbta.setTimeout(attributes.getNumber("timeout").intValue());
    //是否只读事务
    rbta.setReadOnly(attributes.getBoolean("readOnly"));
    //事务别名
    rbta.setQualifier(attributes.getString("value"));
    //回滚机制
    List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
    for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
       rollbackRules.add(new RollbackRuleAttribute(rbRule));
    }
    for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
       rollbackRules.add(new RollbackRuleAttribute(rbRule));
    }
    for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
       rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    }
    for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
       rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    }
    rbta.setRollbackRules(rollbackRules);

    return rbta;
}

创建开启事务

创建开启事务(拦截事务方法所属类TransactionAspectSupportcreateTransactionIfNecessary方法)即是为我们当前事务方法创建并能绑定对应的事务信息,那么事务信息要包含哪些元素呢?

事务信息

事务信息由事务管理器、事务属性、事务状态和方法之前事务信息组成。而事务状态则包含了底层事务、是否新事务、新事务同步状态、事务只读、事务其他资源等信息。通过一张图来了解即为:

image-20240901184338258

事务状态由事务管理器创建,通过事务管理器获取到对应的事务状态之后便可以创建新的事务信息,并将其绑定到当前事务方法所在的线程上。

事务信息设置

设置事务信息,即设置对应的事务属性、事务状态等信息,并将事务信息绑定到当前事务方法所在的线程上。事务属性在注解解析时已经获得,我们需要关注的即是事务状态的的获取。

//当前线程的事务信息
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
       new NamedThreadLocal<>("Current aspect-driven transaction");

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
		@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

	// 没有指定事务名称,则以事务所在方法为事务明明
	if (txAttr != null && txAttr.getName() == null) {
		txAttr = new DelegatingTransactionAttribute(txAttr) {
			@Override
			public String getName() {
				return joinpointIdentification;
			}
		};
	}
	//获取事务状态
	TransactionStatus status = null;
	if (txAttr != null) {
		if (tm != null) {
			status = tm.getTransaction(txAttr);
		}
		else {
			if (logger.isDebugEnabled()) {
				logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
							"] because no transaction manager has been configured");
			}
		}
	}
	return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
       @Nullable TransactionAttribute txAttr, String joinpointIdentification,
       @Nullable TransactionStatus status) {

    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    if (txAttr != null) {
       ... ...
       txInfo.newTransactionStatus(status);
    }
    else {
       ... ... 
    }
    txInfo.bindToThread();
    return txInfo;
}

事务状态获取

事务状态的获取流程比较复杂,在AbstractPlatformTransactionManagergetTransaction方法中,我们通过流程图来看下具体流程:

image-20240901192351610

对应源码为:

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
       throws TransactionException {

    // 没有事务定义信息则使用默认事务信息
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
	//获取事务
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
	//是否已经存在事务
    if (isExistingTransaction(transaction)) {
       // 找到现有事务 -> 检查传播行为以了解如何操作。
       return handleExistingTransaction(def, transaction, debugEnabled);
    }

    // 检查事务超时设置。
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
       throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // 未找到现有事务 -> 检查传播行为以了解如何操作。
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
       throw new IllegalTransactionStateException(
             "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
          def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
          def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
       // 当时传播行为时PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED,挂起事务同步资源,创建新事物
       SuspendedResourcesHolder suspendedResources = suspend(null);
       if (debugEnabled) {
          logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
       }
       try {
          return startTransaction(def, transaction, debugEnabled, suspendedResources);
       }
       catch (RuntimeException | Error ex) {
          //如果创建异常,则恢复挂起资源
          resume(null, suspendedResources);
          throw ex;
       }
    }
    else {
       // 创建 “空” 事务:没有实际的事务,但可能会同步。
       if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
          logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                "isolation level will effectively be ignored: " + def);
       }
       boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
       return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

这里看源码时也可以复习一下事务的传播行为:

  1. REQUIRED
  • 描述:如果当前没有事务,则启动一个新事务。如果当前已经有一个事务,则加入这个事务。
  • 典型场景:这是最常用的传播行为,适用于需要在已有事务中执行操作的情况。
  1. REQUIRES_NEW
  • 描述:无论当前是否有事务,总是启动一个新事务。当前事务(如果存在)将被挂起,直到新事务完成。
  • 典型场景:用于需要独立提交的操作,不希望当前事务的回滚影响此操作。
  1. SUPPORTS
  • 描述:如果当前有事务,则加入该事务;如果当前没有事务,则以非事务方式执行。
  • 典型场景:适用于既可以在事务中执行也可以在非事务中执行的操作。
  1. NOT_SUPPORTED
  • 描述:总是在非事务环境中执行。如果当前有事务,则将其挂起,直到该操作完成。
  • 典型场景:适用于不希望操作在事务中执行的情况。
  1. MANDATORY
  • 描述:必须在一个已有的事务中执行,如果当前没有事务,则抛出异常。
  • 典型场景:用于必须在事务上下文中运行的代码。
  1. NEVER
  • 描述:总是在非事务环境中执行,如果当前有事务存在,则抛出异常。
  • 典型场景:适用于不希望在事务中执行的操作。
  1. NESTED
  • 描述:如果当前有事务,则在当前事务中启动一个嵌套事务(该嵌套事务有单独的提交和回滚)。如果当前没有事务,则行为类似于 REQUIRED
  • 典型场景:适用于需要部分回滚的复杂业务逻辑

上述方法中只是展示了REQUIREDREQUIRES_NEWPROPAGATION_NESTEDNEVER部分实现逻辑,在检查到当前方法有其他事务运行的handleExistingTransaction中,我们能找到剩下事务传播行为的处理流程

已有事务处理

handleExistingTransaction中,我们可以看到事务的传播行为在方法已有其他事务运行时的不同处理策略:

  1. 当传播行为是PROPAGATION_NEVER,则直接抛出IllegalTransactionStateException
  2. 当传播行为是PROPAGATION_NOT_SUPPORTED,会先挂起已有事务,然后根据已挂起的事务资源创建一个空事务
  3. 当传播行为是PROPAGATION_REQUIRES_NEW,会先挂起已有事务,然后创建开启一个新事务
  4. 当传播行为是PROPAGATION_NESTED,会基于已有事务,创建保存点,然后返回已有事务
  5. 当传播行为是PROPAGATION_SUPPORTSPROPAGATION_REQUIRED,则会先判断事务属性和已有事务的一致性(隔离级别与是否只读),然后返回已有事务

事务同步管理器

AbstractPlatformTransactionManager对事务创建和开启的过程中,有一个TransactionSynchronizationManager在事务的创建、挂起和恢复中经常能看到:

//事务同步状态就绪
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    if (status.isNewSynchronization()) {
       TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
       TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
             definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                   definition.getIsolationLevel() : null);
       TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
       TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
       TransactionSynchronizationManager.initSynchronization();
    }
}

//事务同步状态挂起
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
       List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
       try {
          Object suspendedResources = null;
          if (transaction != null) {
             suspendedResources = doSuspend(transaction);
          }
          String name = TransactionSynchronizationManager.getCurrentTransactionName();
          TransactionSynchronizationManager.setCurrentTransactionName(null);
          boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
          TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
          Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
          TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
          boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
          TransactionSynchronizationManager.setActualTransactionActive(false);
          return new SuspendedResourcesHolder(
                suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
       }
       catch (RuntimeException | Error ex) {
          // doSuspend failed - original transaction is still active...
          doResumeSynchronization(suspendedSynchronizations);
          throw ex;
       }
    }
    else if (transaction != null) {
       // Transaction active but no synchronization active.
       Object suspendedResources = doSuspend(transaction);
       return new SuspendedResourcesHolder(suspendedResources);
    }
    else {
       // Neither transaction nor synchronization active.
       return null;
    }
}

//事务同步状态恢复
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
       throws TransactionException {

    if (resourcesHolder != null) {
       Object suspendedResources = resourcesHolder.suspendedResources;
       if (suspendedResources != null) {
          doResume(transaction, suspendedResources);
       }
       List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
       if (suspendedSynchronizations != null) {
          TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
          TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
          TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
          TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
          doResumeSynchronization(suspendedSynchronizations);
       }
    }
}

显而易见,事务方法所在线程拥有的事务资源都是通过TransactionSynchronizationManager管理,并由TransactionSynchronizationManager维护当前线程的事务活跃状态。

查看这个类,我们发现其果然由6个ThreadLocal修饰的变量组成:

private static final ThreadLocal<Map<Object, Object>> resources =
       new NamedThreadLocal<>("Transactional resources");

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
       new NamedThreadLocal<>("Transaction synchronizations");

private static final ThreadLocal<String> currentTransactionName =
       new NamedThreadLocal<>("Current transaction name");

private static final ThreadLocal<Boolean> currentTransactionReadOnly =
       new NamedThreadLocal<>("Current transaction read-only status");

private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
       new NamedThreadLocal<>("Current transaction isolation level");

private static final ThreadLocal<Boolean> actualTransactionActive =
       new NamedThreadLocal<>("Actual transaction active");

其分别为当前线程拥有的事务资源,事务同步信号量,当前事务名称,当前事务只读状态,当前事务隔离级别,当前事务活跃状态组成。

  • 当我们创建事务时,当前事务名称,当前事务只读状态,当前事务隔离级别,当前事务活跃状态设置为我们当前在执行的事务信息,并为此事务创建对应的事务同步信号量
  • 当我们恢复事务时,会将当前事务名称,当前事务只读状态,当前事务隔离级别,当前事务活跃状态设置为我们当前在执行的事务信息,并将恢复事务同步信号量,将此事务绑定对应的事务资源
  • 当我们挂起事务时,则会将当前事务名称,当前事务只读状态,当前事务隔离级别,当前事务活跃状态设置为空,并挂起事务同步信号量,将此事务对应的事务资源解绑

事务提交和回滚

事务提交和回滚在TransactionAspectSupport的事务方法处理过程中已经提及:

//事务回滚
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
       if (logger.isTraceEnabled()) {
          logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                "] after exception: " + ex);
       }
       if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
          try {
              //调用事务管理器进行回滚
             txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
          }
          catch (TransactionSystemException ex2) {
             logger.error("Application exception overridden by rollback exception", ex);
             ex2.initApplicationException(ex);
             throw ex2;
          }
          catch (RuntimeException | Error ex2) {
             logger.error("Application exception overridden by rollback exception", ex);
             throw ex2;
          }
       }
       else {
          // 如果异常回滚不包含对应异常,则正常提交事务
          try {
             txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
          }
          catch (TransactionSystemException ex2) {
             logger.error("Application exception overridden by commit exception", ex);
             ex2.initApplicationException(ex);
             throw ex2;
          }
          catch (RuntimeException | Error ex2) {
             logger.error("Application exception overridden by commit exception", ex);
             throw ex2;
          }
       }
    }
}

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
       if (logger.isTraceEnabled()) {
          logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
       }
       txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}

不同组件的事务管理器

在上述事务管理的过程中,事务的开启、挂起、恢复、回滚和提交其实是由不同持久化组件自定义实现逻辑的,AbstractPlatformTransactionManager只是其中的构建者,而不同框架则为事物的管理提供了不同的扳手和螺丝刀:

image-20240901160207089

我们以JDBC为例,其在DataSourceTransactionManager中提供了事务的开启、挂起、恢复、回滚和提交一种实现方式:

//获取数据库连接,开启事务
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
       if (!txObject.hasConnectionHolder() ||
             txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
          //创建数据库连接
          Connection newCon = obtainDataSource().getConnection();
          if (logger.isDebugEnabled()) {
             logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
          }
          txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
       }

       txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
       con = txObject.getConnectionHolder().getConnection();

       Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
       txObject.setPreviousIsolationLevel(previousIsolationLevel);
       txObject.setReadOnly(definition.isReadOnly());

       // 手动将数据库连接中的自动提交关闭
       if (con.getAutoCommit()) {
          txObject.setMustRestoreAutoCommit(true);
          if (logger.isDebugEnabled()) {
             logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
          }
          con.setAutoCommit(false);
       }
	   //创建事务连接
       prepareTransactionalConnection(con, definition);
       txObject.getConnectionHolder().setTransactionActive(true);

       int timeout = determineTimeout(definition);
       if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
          txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
       }

       // 为当前线程绑定数据库连接
       if (txObject.isNewConnectionHolder()) {
          TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
       }
    }

    catch (Throwable ex) {
       if (txObject.isNewConnectionHolder()) {
          DataSourceUtils.releaseConnection(con, obtainDataSource());
          txObject.setConnectionHolder(null, false);
       }
       throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}

//事务挂起
@Override
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

//事务恢复
@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

//事务提交
@Override
protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
       logger.debug("Committing JDBC transaction on Connection [" + con + "]");
    }
    try {
       con.commit();
    }
    catch (SQLException ex) {
       throw translateException("JDBC commit", ex);
    }
}

//事务回滚
@Override
protected void doRollback(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
       logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
    }
    try {
       con.rollback();
    }
    catch (SQLException ex) {
       throw translateException("JDBC rollback", ex);
    }
}

总结

通过上述源码解读我们可以发现spring事务的管理是将事务绑定到事务方法所属的线程上,然后通过事务管理器实现对此线程所有事务,以及同一方法多个事务的管理。简单图解为如下:

image-20240901201233024

通过源码解读,我们了解了spring事务的运行机制,以及不同事务传播机制的实现方式,这方便我们以后在多线程环境和高并发服务开发中做更好的拓展。

0

评论区