记忆、淡忘

Spring 事务

简介

spring支持编程式事务管理和声明式事务管理两种方式:
  • 编程式事务是在代码中直接加入处理事务的逻辑,可能需要在代码中显式调用beginTransaction()、commit()、rollback()等事务管理相关的方法。编程式事务管理使用TransactionTemplate或者直接使用底层的PlatformTransactionManager。对于编程式事务管理,spring推荐使用TransactionTemplate。
  • 声明式事务管理建立在AOP之上的。其本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务。声明式事务最大的优点就是不需要通过编程的方式管理事务,这样就不需要在业务逻辑代码中掺杂事务管理的代码,只需在配置文件中做相关的事务规则声明(或通过基于@Transactional注解的方式),便可以将事务规则应用到业务逻辑中。

二者区别:编程式事务侵入性比较强,但处理粒度更细.

声明式事务

声明式事务是通过AOP来实现的,在Spring中,通过TransactionProxyFactoryBean生成代理对象,在代理对象中通过TransactionInterceptor来完成对代理方法的拦截。

事务拦截器的使用

配置TransactionProxyFactoryBean的XML文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<!--
Spring中常用事务类型:
PROPAGATION_REQUIRED 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。
PROPAGATION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY 支持当前事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。
-->
<!-- 事务代理、每个Bean都有一个代理-->
<bean id="userService" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
<property name="transactionManager" ref="transactionManager"></property>
<property name="transactionAttributes">
<props>
<prop key="*">PROPAGATION_REQUIRED</prop>
</props>
</property>
<property name="proxyInterfaces">
<!--接口代理-->
<value>app.service.UserService</value>
</property>
<property name="target" >
<bean class="app.service.impl.UserServiceImpl"></bean>
</property>
</bean>
<!-- 事务代理、所有Bean共享一个代理基类
<bean id="transactionBase" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean" abstract="true">
<property name="transactionManager" ref="transactionManager"></property>
<property name="transactionAttributes">
<props>
<prop key="*">PROPAGATION_REQUIRED</prop>
</props>
</property>
</bean>
<bean id="userService" parent="transactionBase" >
<property name="target">
<bean class="app.service.impl.UserServiceImpl"/>
</property>
</bean>
-->

下面看一TransactionProxyFactoryBean的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean implements BeanFactoryAware {
//定义拦截器,通过这个拦截器来实现事务
private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();
private Pointcut pointcut;
public TransactionProxyFactoryBean() {
}
//注入transactionManager
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionInterceptor.setTransactionManager(transactionManager);
}
//事务管理的属性注入到TransactionInterceptor中,Properties的形式
public void setTransactionAttributes(Properties transactionAttributes) {
this.transactionInterceptor.setTransactionAttributes(transactionAttributes);
}
public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
this.transactionInterceptor.setTransactionAttributeSource(transactionAttributeSource);
}
public void setPointcut(Pointcut pointcut) {
this.pointcut = pointcut;
}
public void setBeanFactory(BeanFactory beanFactory) {
this.transactionInterceptor.setBeanFactory(beanFactory);
}
//创建事务处理的Advisor
protected Object createMainInterceptor() {
this.transactionInterceptor.afterPropertiesSet();
return this.pointcut != null?new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor):new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
}
protected void postProcessProxyFactory(ProxyFactory proxyFactory) {
proxyFactory.addInterface(TransactionalProxy.class);
}
}

下面我们看一下transactionInterceptor是如何成为Advisor的,由于AbstractSingletonProxyFactoryBean类实现了InitializingBean接口,因此在Bean初始化过程完成依赖注入的时候调用了afterPropertiesSet方法,具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public void afterPropertiesSet() {
//目标对象是必须的
if (this.target == null) {
throw new IllegalArgumentException("Property 'target' is required");
}
//目标对象必须是bean的引用
if (this.target instanceof String) {
throw new IllegalArgumentException("'target' needs to be a bean reference, not a bean name as value");
}
if (this.proxyClassLoader == null) {
this.proxyClassLoader = ClassUtils.getDefaultClassLoader();
}
//创建代理工厂
ProxyFactory proxyFactory = new ProxyFactory();
//如果在事务拦截器之前配置了额外的拦截器
if (this.preInterceptors != null) {
//将这些事务之前的额外拦截器添加到通知器中
for (Object interceptor : this.preInterceptors) {
proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(interceptor));
}
}
// 加入通知器,createMainInterceptor()方法在TransactionProxyFactoryBean实现
proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(createMainInterceptor()));
if (this.postInterceptors != null) {
for (Object interceptor : this.postInterceptors) {
proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(interceptor));
}
}
proxyFactory.copyFrom(this);
//创建AOP的目标源
TargetSource targetSource = createTargetSource(this.target);
proxyFactory.setTargetSource(targetSource);
//配置目标接口
if (this.proxyInterfaces != null) {
proxyFactory.setInterfaces(this.proxyInterfaces);
}
else if (!isProxyTargetClass()) {
// 根据目标对象获取代理接口
proxyFactory.setInterfaces(
ClassUtils.getAllInterfacesForClass(targetSource.getTargetClass(), this.proxyClassLoader));
}
postProcessProxyFactory(proxyFactory);
//生成代理对象
this.proxy = proxyFactory.getProxy(this.proxyClassLoader);
}

经过一系列的步骤,Spring的事务拦截器TransactionIntercepter(AOP的Advice)配置到ProxyFactory生成的AOP代理对象中。

事务拦截器的配置

TransactionInterceptor的父类TransactionAspectSupport中配置transactionAttributeSource属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public void setTransactionAttributes(Properties transactionAttributes) {
NameMatchTransactionAttributeSource tas = new NameMatchTransactionAttributeSource();
tas.setProperties(transactionAttributes);
this.transactionAttributeSource = tas;
}
public class NameMatchTransactionAttributeSource implements TransactionAttributeSource, Serializable {
protected static final Log logger = LogFactory.getLog(NameMatchTransactionAttributeSource.class);
/** Keys are method names; values are TransactionAttributes */
private Map<String, TransactionAttribute> nameMap = new HashMap<>();
public void setNameMap(Map<String, TransactionAttribute> nameMap) {
for (Map.Entry<String, TransactionAttribute> entry : nameMap.entrySet()) {
addTransactionalMethod(entry.getKey(), entry.getValue());
}
}
//设置配置的事务方法
public void setProperties(Properties transactionAttributes) {
TransactionAttributeEditor tae = new TransactionAttributeEditor();
Enumeration<?> propNames = transactionAttributes.propertyNames();
while (propNames.hasMoreElements()) {
String methodName = (String) propNames.nextElement();
String value = transactionAttributes.getProperty(methodName);
tae.setAsText(value);
TransactionAttribute attr = (TransactionAttribute) tae.getValue();
addTransactionalMethod(methodName, attr);
}
}
public void addTransactionalMethod(String methodName, TransactionAttribute attr) {
if (logger.isDebugEnabled()) {
logger.debug("Adding transactional method [" + methodName + "] with attribute [" + attr + "]");
}
this.nameMap.put(methodName, attr);
}
//对调用的方法进行判断,判断是否是事务方法,如果是事务方法,则取出相应的事务配置
@Override
public TransactionAttribute getTransactionAttribute(Method method, Class<?> targetClass) {
if (!ClassUtils.isUserLevelMethod(method)) {
return null;
}
// 直接通过方法名进行匹配
String methodName = method.getName();
TransactionAttribute attr = this.nameMap.get(methodName);
if (attr == null) {
// 不能直接匹配,通过PatternMatchUtils.simpleMatch来进行匹配
String bestNameMatch = null;
for (String mappedName : this.nameMap.keySet()) {
if (isMatch(methodName, mappedName) &&
(bestNameMatch == null || bestNameMatch.length() <= mappedName.length())) {
attr = this.nameMap.get(mappedName);
bestNameMatch = mappedName;
}
}
}
return attr;
}
//事务方法的匹配判断
//支持简单的匹配规则: "xxx*", "*xxx", "*xxx*" "xxx*yyy" 等
protected boolean isMatch(String methodName, String mappedName) {
return PatternMatchUtils.simpleMatch(mappedName, methodName);
}
//省略部分代码
}

事务拦截器的拦截

在AOP中我们知道,在拦截器中会有一个invoke()方法,该方法是代理对象的回调方法,下面我们看一下TransactionInterceptor中的invoke方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public Object invoke(final MethodInvocation invocation) throws Throwable {
//获取代理的目标对象
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 调用TransactionAspectSupport的invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// 如果事务配置属性TransactionAttribut为null 则 没有事务
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
//获取具体的事务处理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
//区分不同类型的PlatformTransactionManager事务处理器,不同类型的事务处理器调用方式不同。
//对CallbackPreferringPlatformTransactionManager,需要回调函数来实现事务的创建和提交,
//对非CallbackPreferringPlatformTransactionManager来说,则不需要使用回调函数来实现事务处理。
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
//声明式事务
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
//创建事务,将当前事务状态和信息保存到TransactionInfo对象中
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
//沿着拦截器链调用处理,使得最后目标对象的方法得到调用
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
//在调用拦截器拦过程中出现异常,则根据事务配置进行提交或回滚处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//通过TransactionInfo设置城oldTransactionInfo的方式清除与当前线程绑定的事务信息
cleanupTransactionInfo(txInfo);
}
//事务提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
//编程式事务
// 通过回调使用事务
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
return new ThrowableHolder(ex);
}
}
finally {
cleanupTransactionInfo(txInfo);
}
}
});
// Check result: It might indicate a Throwable to rethrow.
if (result instanceof ThrowableHolder) {
throw ((ThrowableHolder) result).getThrowable();
}
else {
return result;
}
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
}
}

Spring的事务管理是通过TransactionInfo对象来完成的,在该对象中,封装了事务对象的和事务处理的状态信息。

编程式事务

spring编程式事务使用的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void add(String name, Integer age) {
TransactionDefinition txDefinition = new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED) ;
TransactionStatus status = transactionManager.getTransaction(txDefinition);
try{
save("insert into user(user_name, age,password) values(?, ?,?)", name, age,"111111");
System.out.print(1/0);
}catch (Exception e){
transactionManager.rollback(status);
throw e;
}
transactionManager.commit(status);
}

在上面的代码中,通过DefaultTransactionDefinition对象持有事务处理属性,在创建事务的过程中得到一个TransactionStatus对象,然后通过transactionManager的commit()和rollback()方法来完成事务的处理。
这段代码是自己写的较简单的事务代码,在上面的invoke()方法中,较详细的使用了编程式事务。

注意:Spring事务默认情况下只对RuntimeException进行回滚。

事务的源码分析

虽然事务的使用有两种方法,但从实用性的角度而言,我们在开发中更多的是使用了声明式事务,下面我们来分析这个步骤:

1、获取事务属性,加载配置中的TransactioManager,不同的事务处理方式使用不同的逻辑。
2、在目标方法执行前获取事务并收集事务信息。
3、执行目标方法(出现异常,尝试异常处理,默认只对RunTimeException和Err回滚),执行成功则提交事务。
4、清除事务信息。

事务的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, final TransactionAttribute txAttr, final String joinpointIdentification) {
//如果没有名称则使用方法唯一标识,并用DelegatingTransactionAttribute封装txAttr
if(txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {
txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if(txAttr != null) {
if(tm != null) {
//获取事务的状态
status = tm.getTransaction((TransactionDefinition)txAttr);
} else if(this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
}
}
//根据属性与status生成一个TransactionInfo
return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}

上面的方法中主要是DelegatingTransactionAttribute封装了传入的TransactionAttribute实例。

获取事务

下面我们看一下获取事务的代码

getTransaction来处理事务的准备工作,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
//判断当前线程是否存在事务(根据当前线程的连接以及连接中的transactionActive字段判断)
if (isExistingTransaction(transaction)) {
// 当前线程存在事务的处理
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 事务超时设置
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 事务不存在,根据配置的事务属性选择执行操作
/**
*PROPAGATION_REQUIRED--支持当前事务,如果当前没有事务,就新建一个事务
*PROPAGATION_SUPPORTS--支持当前事务,如果当前没有事务,就以非事务方式执行。
*PROPAGATION_MANDATORY--支持当前事务,如果当前没有事务,就抛出异常。
*PROPAGATION_REQUIRES_NEW--新建事务,如果当前存在事务,把当前事务挂起。
*PROPAGATION_NOT_SUPPORTED--以非事务方式执行操作,如果当前存在事务,把当前事务挂起
*PROPAGATION_NEVER--以非事务方式执行,如果当前存在事务,则抛出异常。
*PROPAGATION_NESTED--如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行 * 与PROPAGATION_REQUIRED类似的操作。
**/
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//空挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//创建DefaultTransactionStatus,完善transaction
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}

我们具体分析一下上面代码中的操作,事务的操作以DataSourceManager为例:

创建事务实例
1
2
3
4
5
6
7
8
9
10
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//如果当前线程已经记录数据库连接,则使用原有连接
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
//false 表示不是新建连接
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
事务设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//判断数据库连接是否存在(事务同步标识设为true的也需要重新获取连接),
//新连接则绑定到当前线程
if (txObject.getConnectionHolder() == null ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//设置隔离级别、只读标识
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 修改默认提交的属性,由Spring控制提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
//标识当前线程已存在事务
txObject.getConnectionHolder().setTransactionActive(true);
//设置过期时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 将连接绑定到当前线程
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

事务的隔离是对数据库连接的设置,在prepareConnectionForTransaction方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static Integer prepareConnectionForTransaction(Connection con, TransactionDefinition definition)
throws SQLException {
Assert.notNull(con, "No Connection specified");
// 设置数据库连接的只读标识
if (definition != null && definition.isReadOnly()) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Setting JDBC Connection [" + con + "] read-only");
}
con.setReadOnly(true);
}
catch (SQLException ex) {
Throwable exToCheck = ex;
while (exToCheck != null) {
if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
// Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
throw ex;
}
exToCheck = exToCheck.getCause();
}
// "read-only not supported" SQLException -> ignore, it's just a hint anyway
logger.debug("Could not set JDBC Connection read-only", ex);
}
catch (RuntimeException ex) {
Throwable exToCheck = ex;
while (exToCheck != null) {
if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
// Assume it's a connection timeout that would otherwise get lost: e.g. from Hibernate
throw ex;
}
exToCheck = exToCheck.getCause();
}
// "read-only not supported" UnsupportedOperationException -> ignore, it's just a hint anyway
logger.debug("Could not set JDBC Connection read-only", ex);
}
}
// 设置数据库连接的隔离级别
Integer previousIsolationLevel = null;
if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
if (logger.isDebugEnabled()) {
logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
definition.getIsolationLevel());
}
int currentIsolation = con.getTransactionIsolation();
if (currentIsolation != definition.getIsolationLevel()) {
previousIsolationLevel = currentIsolation;
con.setTransactionIsolation(definition.getIsolationLevel());
}
}
return previousIsolationLevel;
}
同步事务信息设置到当前线程
1
2
3
4
5
6
7
8
9
10
11
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
处理已存在事务

在上一小节中我们分析了新建事务的处理,下面看一下已存在的事务是如何操作的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* Create a TransactionStatus for an existing transaction.
*/
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//当前存在事务,则抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//当前存在事务,就把当前事务挂起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//当前存在事务,把当前事务挂起,并建一个新的事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
//当前存在事务,则在嵌套事务内执行
//如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// 创建保存点,以便控制事务的回滚
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// JTA transaction 不能使用保存点操作(为什么?),新建事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
记录事务信息

当已经建立事务链接并完成了事务信息的提取之后,我们要将事务信息统一纪录在TransactionInfo的实例中,这个实例记录了目标方法开始前的状态信息,一旦事务执行失败,再根据实例中的信息进行回滚等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled())
logger.trace("Don't need to create transaction for [" + joinpointIdentification +
"]: This method isn't transactional.");
}
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}

事务回滚

我们在分析事务拦截器的回调方法中可以发现,当目标方法执行之后会出现两种情况:出现异常事务回滚、目标方法执行成功事务提交。下面我们分析一下事务的回滚:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 判断是否是Err或RuntimeException异常,是的话回滚
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by rollback error", ex);
throw err;
}
}
else {
// 即使是异常,不满足条件的话一样提交事务
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by commit error", ex);
throw err;
}
}
}
}

目标方法执行过程中,出现Throwable的情况就会进入当前方法,但不是所有的Throwable都会被异常处理,只有当异常是RuntimeException和Err的情况下事务才会回滚,其他的异常下数据依旧会被提交。

是否回滚

通过txInfo.transactionAttribute.rollbackOn(ex)来判断当前异常是否需要回滚

1
2
3
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}

默认情况下只有RuntimeException和Err的的异常才会回滚,但是我们可以扩展来改变。不过更常用的是注解方式来改变异常回滚信息(这里不分析注解的事务,之后会进行分析)。

回滚处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus);
}
private void processRollback(DefaultTransactionStatus status) {
try {
try {
triggerBeforeCompletion(status);
//判断是否存在保存点。如果有则回退到保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
//当前事务为独立的新事务,直接回退
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
//当前线程不是独立事务,则标记状态,等事务链执行完毕后统一回退
else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
cleanupAfterCompletion(status);
}
}
回滚后信息清除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
//设置完成状态
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
//结束事务的挂起状态
resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
//清除当前线程绑定的事务信息
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 从当前线程释放数据库连接
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(this.dataSource);
}
// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
//恢复数据库自动提交
con.setAutoCommit(true);
}
//重置数据库连接,设置只读标志位与隔离级别
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
//如果当前事务是新建的事务,在事务完成时释放数据库连接
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}

事务提交

上一节我们分析了异常时的事务回滚,下面我们看一下目标方法正常执行后的事务提交:

1
2
3
4
5
6
7
8
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
//如果在事务链中标记回滚,则事务直接回滚(异常处理时事务即没有保存点也不是新的事务时标记的回滚状态)
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus);
// Throw UnexpectedRollbackException only at outermost transaction boundary
// or if explicitly asked to.
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
return;
}
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
//预留的接口
prepareForCommit(status);
//TransactionSynchronization的BeforeCommit方法调用
triggerBeforeCommit(status);
//TransactionSynchronization的BeforeCompletion方法调用
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
//判断是否存在保存点,存在则清楚
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
//判断是否是新的事务,新事物则提交
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}

代码的if条件判断两个点:事务状态是否存在保存点与事务是否是新的事务;这两个条件的判断主要是考虑到内嵌事务的情况,内嵌事务开始前设置的保存点在内嵌事务出现异常时方便回滚,如果没有异常,内嵌事务不会单独提交,而是由最外层的事务提交。

事务的提交最终还是由数据库连接进行的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}

为了学习,demo如下
TransactionProxyFactoryBean代理
transactionmanager 事务编程式使用
transactiontemplate 事务编程式使用
嵌套事务调用的各种情况