本文和大家一起刨析 Spring 事务的相关源码,篇幅较长,代码片段较多,建议使用电脑阅读
本文目标
- 理解Spring事务管理核心接口
- 理解Spring事务管理的核心逻辑
- 理解事务的传播类型及其实现原理
版本
SpringBoot 2.3.3.RELEASE
什么是事务的传播?
Spring 除了封装了事务控制之外,还抽象出了 事务的传播 这个概念,事务的传播并不是关系型数据库所定义的,而是Spring在封装事务时做的增强扩展,可以通过@Transactional
指定事务的传播,具体类型如下
事务传播行为类型 | 说明 |
---|---|
PROPAGATION_REQUIRED |
如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。Spring的默认事务传播类型 |
PROPAGATION_SUPPORTS |
支持当前事务,如果当前没有事务,就以非事务方式执行。 |
PROPAGATION_MANDATORY |
使用当前的事务,如果当前没有事务,就抛出异常。 |
PROPAGATION_REQUIRES_NEW |
新建事务,如果当前存在事务,把当前事务挂起(暂停)。 |
PROPAGATION_NOT_SUPPORTED |
以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。 |
PROPAGATION_NEVER |
以非事务方式执行,如果当前存在事务,则抛出异常。 |
PROPAGATION_NESTED |
如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。 |
举个栗子
以嵌套事务为例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class DemoServiceImpl implements DemoService {
private JdbcTemplate jdbcTemplate;
private DemoServiceImpl self;
public void insertDB() {
String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";
jdbcTemplate.update(sql, uuid(), "taven");
try {
// 内嵌事务将会回滚,而外部事务不会受到影响
self.nested();
} catch (Exception e) {
e.printStackTrace();
}
}
(propagation = Propagation.NESTED)
public void nested() {
String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";
jdbcTemplate.update(sql, uuid(), "nested");
throw new RuntimeException("rollback nested");
}
private String uuid() {
return UUID.randomUUID().toString();
}
}
上述代码中,nested()方法标记了事务传播类型为嵌套,如果nested()
中抛出异常仅会回滚nested()
方法中的sql,不会影响到insertDB()
方法中已经执行的sql
注意:service 调用内部方法时,如果直接使用this调用,事务不会生效。因此使用this调用相当于跳过了外部的代理类,所以AOP不会生效,无法使用事务
思考
众所周知,Spring 事务是通过AOP实现的,如果是我们自己写一个AOP控制事务,该怎么做呢?1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// 伪代码
public Object invokeWithinTransaction() {
// 开启事务
connection.beginTransaction();
try {
// 反射执行方法
Object result = invoke();
// 提交事务
connection.commit();
return result;
} catch(Exception e) {
// 发生异常时回滚
connection.rollback();
throw e;
}
}
在这个基础上,我们来思考一下如果是我们自己做的话,事务的传播该如何实现
以PROPAGATION_REQUIRED
为例,这个似乎很简单,我们判断一下当前是否有事务(可以考虑使用ThreadLocal存储已存在的事务对象),如果有事务,那么就不开启新的事务。反之,没有事务,我们就创建新的事务
如果事务是由当前切面开启的,则提交/回滚事务,反之不做处理
那么事务传播中描述的挂起(暂停)当前事务,和内嵌事务是如何实现的?
源码入手
要阅读事务传播相关的源码,我们先来了解下Spring 事务管理的核心接口与类
TransactionDefinition
该接口定义了事务的所有属性(隔离级别,传播类型,超时时间等等),我们日常开发中经常使用的@Transactional
其实最终会被转化为 TransactionDefinitionTransactionStatus
事务的状态,以最常用的实现 DefaultTransactionStatus 为例,该类存储了当前的事务对象,savepoint,当前挂起的事务,是否完成,是否仅回滚等等TransactionManager
这是一个空接口,直接继承他的 interface 有 PlatformTransactionManager(我们平时用的就是这个,默认的实现类DataSourceTransactionManager)以及
ReactiveTransactionManager(响应式事务管理器,由于不是本文重点,我们不多说)
从上述两个接口来看,TransactionManager 的主要作用
- 通过TransactionDefinition开启一个事务,返回TransactionStatus
- 通过TransactionStatus 提交、回滚事务(实际开启事务的Connection通常存储在TransactionStatus中)
1 | public interface PlatformTransactionManager extends TransactionManager { |
- TransactionInterceptor
事务拦截器,事务AOP的核心类(支持响应式事务,编程式事务,以及我们常用的标准事务),由于篇幅原因,本文只讨论标准事务的相关实现
下面我们从事务逻辑的入口 TransactionInterceptor 入手,来看下Spring事务管理的核心逻辑以及事务传播的实现
TransactionInterceptor
TransactionInterceptor 实现了MethodInvocation(这是实现AOP的一种方式),其核心逻辑在父类TransactionAspectSupport 中,方法位置:TransactionInterceptor::invokeWithinTransaction
1 | protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, |
这里代码很多,根据注释的位置,我们可以把核心逻辑梳理出来
- 获取当前事务属性,事务管理器(以注解事务为例,这些都可以通过
@Transactional
来定义) createTransactionIfNecessary
,判断是否有必要创建事务invocation.proceedWithInvocation
执行拦截器链,最终会执行到目标方法completeTransactionAfterThrowing
当抛出异常后,完成这个事务,提交或者回滚,并抛出这个异常commitTransactionAfterReturning
从方法命名来看,这个方法会提交事务。
但是深入源码中会发现,该方法中也包含回滚逻辑,具体行为会根据当前TransactionStatus的一些状态来决定(也就是说,我们也可以通过设置当前TransactionStatus,来控制事务回滚,并不一定只能通过抛出异常),详见AbstractPlatformTransactionManager::commit
我们继续,来看看createTransactionIfNecessary做了什么
TransactionAspectSupport::createTransactionIfNecessary
1 | protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, |
createTransactionIfNecessary中的核心逻辑
- 通过PlatformTransactionManager(事务管理器)开启事务
prepareTransactionInfo
准备事务信息,这个具体做了什么我们稍后再讲
继续来看PlatformTransactionManager::getTransaction
,该方法只有一个实现 AbstractPlatformTransactionManager::getTransaction
1 | public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) |
代码很多,重点关注注释部分即可
doGetTransaction
获取当前事务- 如果存在事务,则调用
handleExistingTransaction
处理,这个我们稍后会讲到
接下来,会根据事务的传播决定是否开启事务
- 如果事务传播类型为
PROPAGATION_MANDATORY
,且不存在事务,则抛出异常 - 如果传播类型为
PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED
,且当前不存在事务,则调用startTransaction
创建事务 - 当不满足 3、4时,例如
PROPAGATION_NOT_SUPPORTED
,此时会执行事务同步,但是不会创建真正的事务
Spring 事务同步在之前一篇博客中有讲到,传送门👉https://www.jianshu.com/p/7880d9a98a5f
Spring 如何管理当前的事务
接下来讲讲上面提到的doGetTransaction
、handleExistingTransaction
,这两个方法是由不同的TransactionManager自行实现的
我们以SpringBoot默认的TransactionManager,DataSourceTransactionManager为例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
结合 AbstractPlatformTransactionManager::getTransaction
一起来看,doGetTransaction
其实获取的是当前的Connection。
判断当前是否存在事务,是判断DataSourceTransactionObject 对象中是否包含connection,以及connection是否开启了事务。
我们继续来看下TransactionSynchronizationManager.getResource(obtainDataSource())
获取当前connection的逻辑
TransactionSynchronizationManager::getResource
1 | private static final ThreadLocal<Map<Object, Object>> resources = |
看到这里,我们能明白DataSourceTransactionManager是如何管理线程之间的Connection,ThreadLocal 中存储一个Map,key为数据源对象,value为该数据源在当前线程的Connection
DataSourceTransactionManager 在开启事务后,会调用TransactionSynchronizationManager::bindResource
将指定数据源的Connection绑定到当前线程
AbstractPlatformTransactionManager::handleExistingTransaction
我们继续回头看,如果存在事务的情况,如何处理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 如果事务的传播要求以非事务方式执行 抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED 如果存在事务,则挂起当前事务,以非事务方式执行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 挂起当前事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 构建一个无事务的TransactionStatus
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// PROPAGATION_REQUIRES_NEW 如果存在事务,则挂起当前事务,新建一个事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// PROPAGATION_NESTED 内嵌事务,就是我们开头举得例子
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 非JTA事务管理器都是通过savePoint实现的内嵌事务
// savePoint:关系型数据库中事务可以创建还原点,并且可以回滚到还原点
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 创建还原点
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// 如果执行到这一步传播类型一定是,PROPAGATION_SUPPORTS 或者 PROPAGATION_REQUIRED
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 校验目前方法中的事务定义和已存在的事务定义是否一致
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 构建一个TransactionStatus,但不开启事务
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
这里代码很多,逻辑看上述注释即可。这里终于看到了期待已久的挂起事务和内嵌事务了,我们还是看一下DataSourceTransactionManager的实现
- 挂起事务:通过
TransactionSynchronizationManager::unbindResource
根据数据源获取当前的Connection,并在resource中移除该Connection。之后会将该Connection存储到TransactionStatus对象中
1 | // DataSourceTransactionManager::doSuspend |
在事务提交或者回滚后,调用 AbstractPlatformTransactionManager::cleanupAfterCompletion
会将TransactionStatus 中缓存的Connection重新绑定到resource中
- 内嵌事务:通过关系型数据库的savePoint实现,提交或回滚的时候会判断如果当前事务为savePoint则释放savePoint或者回滚到savePoint,具体逻辑参考
AbstractPlatformTransactionManager::processRollback
和AbstractPlatformTransactionManager::processCommit
至此,事务的传播源码分析结束
prepareTransactionInfo
上文留下了一个问题,prepareTransactionInfo 方法做了什么,我们先来看下TransactionInfo
的结构1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18protected static final class TransactionInfo {
private final PlatformTransactionManager transactionManager;
private final TransactionAttribute transactionAttribute;
private final String joinpointIdentification;
private TransactionStatus transactionStatus;
private TransactionInfo oldTransactionInfo;
// ...
}
该类在Spring中的作用,是为了内部传递对象。ThreadLocal中存储了最新的TransactionInfo,通过当前TransactionInfo可以找到他的oldTransactionInfo。每次创建事务时会新建一个TransactionInfo(无论有没有真正的事务被创建)存储到ThreadLocal中,在每次事务结束后,会将当前ThreadLocal中的TransactionInfo重置为oldTransactionInfo,这样的结构形成了一个链表,使得Spring事务在逻辑上可以无限嵌套下去
如果觉得有收获,可以关注我的公众号,你的点赞和关注就是对我最大的支持