揭秘 @GlobalLock 隔离保障的微妙之处原创
一、前奏
在《深度解析 Seata AT 模式中性能优化与隔离保障的平衡之道》中有介绍 Seata AT 模式通过将传统 XA 方案的 2 个阶段的本地 DB 锁,拆分成了 1 阶段的本地锁
(DB 锁),和 2 阶段的全局锁
,且设计的初衷是第 2 阶段的全局锁
在绝大部分情况下非必要,也就是说若不利用 Seata 所提供的一些机制,就不会使用到 2 阶段的全局锁
,那么对大部分隔离性要求不高的场景来说,这样就提升了性能;但若对隔离性有要求,又没有基于 Seata 的规则使用全局锁
就可能会出现脏写和脏读的问题。
若要保障分布式事务修改中的写隔离,必须利用 Seata 中全局锁
机制,保障数据修改操作是互斥的,即不会造成写入脏数据;又尽量避免造成互等死锁。Seata AT 模式下,有两种方法来启用全局锁
-
通过
@GlobalTransactional
启用全局锁
-
功能层面的介绍查看
-
源码层面的介绍查看
-
另外一种是通过
@GlobalLock
启用全局锁
-
功能层面的介绍查看
@GlobalTransactional
基于完整的分布式事务能力保障写隔离,但其能力的完整性在有些场景下也显得太过繁重,而@GlobalLock
则作为一种轻量级的写隔离保障机制作为补充;本篇结合上层功能从源码层面对其进行详细的介绍。
1.1、脏写的产生
如下图示例,有业务一和业务二,业务一使用@Globaltransactional
开启事务调用updateA()
和 updateB()
方法,业务二没有全局事务,直接调用 updateA()
方法,updateA()
方法是修改表tb_a
中 id 值为 1 的记录中 count
字段的值:
-
业务一开启全局事务 -
业务一中分支事务 updateA()
将count
的值修改为 1,提交了本地事务 -
之后业务二中 updateA()
获取到锁,将count
的值修改为 2,提交了本地事务 -
业务一中分支事务 updateB()
出现了错误,导致业务一全局事务的回滚,在回滚updateA()
分支事务时,发现当前 count 字段的值并不是自己所修改的 1,程序逻辑就无法明确该回滚成什么值,导致无法进行自动回滚,需由人工介入排查矫正。
1.2、通过 @GlobalLock
保障写隔离
在 AT 模式下避免脏写的原理也很清晰,就是仅依靠本地锁无法避免,还要依赖 Seata TC 侧的全局锁,在需要全局写隔离的场景下,加入全局锁的判断逻辑即可避免脏写。全局锁
和本地锁
协作以保障数据修改操作是互斥的,不会造成写入脏数据;又尽量避免造成互等死锁。而通过@GlobalTransactional
很容易保障业务二的写隔离,过多细节咱不提,只需注意:业务二提交本地事务后,从其处理上下文来看,已经具备了写隔离性,但是不能就此打住,因为业务二所获取的全局锁还没释放,所以接下来还要再做全局事务的提交,通过提交全局事务将其所添加的全局锁释放掉。
此处是我们自省的关键,业务二只有一个服务,此服务提交了整个事务就提交了,不需要与其他其他服务协作,所以也不需要两阶段处理,那自然全局锁的申请和释放就略显多余了,可试试推理全局锁的添加是否有必要;注意看,业务二只有一个分支事务updateA()
,从上下文来看,保障其写隔离,只需要在获取本地锁后判断一下全局锁是否存在,若存在就等其他全局事务结束后再处理,若全局锁不存即表明没有其他全局事务存在,可放心提交本地事务;@GlobalLock
恰是如此实现,以应对不必依赖全局事务完整交互的这种场景
注意:@GlobalLock
中为何也有前后镜像的构建过程呢,关键在于要通过lockQuery
查询全局锁,而查询全局锁的参数 lockKeys
,需要通过前后镜像记录的主键等信息构建(比如insert
类的 sql,主键通过后镜像获取主键,delete
类的操作要通过前镜像获得,update
类的操作理论上前后镜像都可以)。
从源码视角看,@GlobalLock
注解内部的关键逻辑是Seata
通过Connection
代理,在commit
环节增强处理逻辑,检测不到冲突的全局行锁记录后,才提交本地事务;若检测到冲突的全局行锁记录就重试,@GlobalLock
注解中的lockRetryInternal
为重试间隔,lockRetryTimes
为重试次数。
二、关键逻辑导读
-
GlobalTransactionScanner#wrapIfNecessary
扫描 spring bean 时,判断是否有@GlobalLock
注解,识别到方法上的@GlobalLock
注解后,给 bean 加上 AOP 拦截器GlobalTransactionalInterceptor
。 -
拦截器的
invoke
方法内部是委托给GlobalLockTemplate#execute
在执行业务逻辑方法之前,在ThreadLocal
中打上需要全局行锁判断的标记 -
接下来,在处理业务逻辑执行中的 SQL 时,因 AT 模式是代理数据源做增强,即在处理 SQL 的环节,体现在
BaseTransactionalExecutor#execute
方法内。关键逻辑是如果从ThreadLocal
中识别到需要全局锁的标记,才做全局行锁的判断处理 -
在上一步提到的
BaseTransactionalExecutor#execute
中的代码比较复杂,Seata
的设计是在本地事务执行commit
的前一步(前后镜像构建之后)才做全局事务锁的处理(这样可以减少持锁时间),即ConnectionProxy#commit
内的doCommit
中,其内部因上下文不同有两种后续分支,一种是全局事务提交,一种是 globallock 检测+本地事务提交。 -
globallock 检测+本地事务提交这种情况的处理在代码
processLocalCommitWithGlobalLocks()
中,其中的逻辑是用ConnectionProxy#checkLock
判断有冲突的全局行锁是否存在(本事务所操作的行记录构造成行锁记录是否跟在 TC 侧已存在的全局行锁记录有重复),不存在的情况下通过targetConnection.commit()
提交本地事务;这个过程中并不会在 TC 中添加全局行锁记录。 -
如果
ConnectionProxy#checkLock
检测到全局行锁记录已存在的话,会抛出LockConflictException
,方法外部捕获这个异常后重试(有两种上下文的重试)。 -
重试的次数以及间隔控制体现在方法
lockRetryPolicy#doRetryOnLockConflict
中,而这里的次数和间隔就是@GlobalLock
注解中的lockRetryTimes
和lockRetryInternal
。 -
总结来说,
@GlobalLock
在检测不到冲突的全局事务行锁记录后,就提交本地事务,并没有插入全局行锁记录,也就是说在检测到没有冲突的全局行锁记录时,后续过程的隔离性是由本地事务来保障,即本地事务未提交的数据不会被其他的本地事务和分布式事务修改掉。三、相关源码导读
@GlobalLock
逻辑的关键代码分别体现在以下几处:
1) GlobalTransactionScanner#wrapIfNecessary
扫描 spring bean 时,判断方法上是否有@GlobalLock
注解,如果有则给这个 bean,添加拦截器GlobalTransactionalInterceptor
,也就是说被 @GlobalTransactional 和 @GlobalLock 标注后,Seata 通过 AOP 增强提供的分布式事务能力在 GlobalTransactionalInterceptor 中
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// ... TCC 部分暂略
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
// 判断类或方法上是否有@GlobalTransactional 注解
// 判断方法上有否有 @GlobalLock 注解
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (globalTransactionalInterceptor == null) {
// 构建AOP的拦截器 GlobalTransactionalInterceptor
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
// 运行时监听是否禁用分布式事务,如果禁用,那么拦截器中就不再使用分布式事务的能力
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
// 下方getAdvicesAndAdvisorsForBean 方法中,就返回这个interceptor,
// 也就是说被 @GlobalTransactional 和 @GlobalLock 标注后,Seata通过AOP增强提供的分布式事务能力在 GlobalTransactionalInterceptor中
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
// 如果是普通的bean,走父类的方法生成代理类即可
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 如果已经是代理类,获取到advisor后,添加到该集合即可
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// 根据上面的interceptor生成advisor
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
2) 拦截器GlobalTransactionalInterceptor
的invoke
方法中,判断分布式事务能力未被禁用的情况下,将标注了@GlobalLock 的方法,交给handleGlobalLock(xxx)
处理
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//通过 methodInvocation.getThis() 获取当前方法调用的所属对象
//通过 AopUtils.getTargetClass(xx) 获取当前对象的Class
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
// BridgeMethodResolver.findBridgedMethod https://cloud.tencent.com/developer/article/1656258
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 获取目标方法上 @GlobalTransactional 的信息
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
// 获取目标方法上 @GlobalLock 的信息,@GlobalTransactional 和 @GlobalLock 不该同时存在
// @GlobalTransactional 是开启全局事务
// @GlobalLock 是按照全局事务的隔离级别查看数据
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
// 禁用了,或者 开启了分布式事务能力降级,并且触发了降级的阈值
boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
// 通过 @GlobalTransactional的信息构建 全局事务的核心配置
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.rollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes(),
globalTransactionalAnnotation.lockStrategyMode());
} else {
transactional = this.aspectTransactional;
}
// 处理全局事务
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
// 处理全局锁
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
3) handleGlobalLock
方法中,采用模板方法模式,委托GlobalLockTemplate
处理,通过@GlobalLock 注解中的值构建出GlobalLockConfig
对象,用于控制全局锁获取的频率和尝试次数。把业务方法(methodInvocation.proceed()
)传入到globalLockTemplate.execute(...)
中执行。
private Object handleGlobalLock(final MethodInvocation methodInvocation, final GlobalLock globalLockAnno) throws Throwable {
return globalLockTemplate.execute(new GlobalLockExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
// 将 @GlobalLock 注解中的重试间隔和重试次数设置到全局锁检测配置对象中。
@Override
public GlobalLockConfig getGlobalLockConfig() {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
}
});
}
4) io.seata.rm.GlobalLockTemplate#execute
的核心逻辑是根据上下文情况获取当前全局锁的配置,在执行业务逻辑方法之前,在ThreadLocal
中打上需要全局行锁判断的标记,后续逻辑会读取这个标记。在业务逻辑方法执行之后,也需要将标记从当前ThreadLocal
中移除。
public class GlobalLockTemplate {
//先判断当前是否已经在globalLock范围之内,如果已经在范围之内,那么把上层的配置取出来,用新的配置替换,
// 在方法执行完毕时候,释放锁,或者将配置替换成之前的上层配置
public Object execute(GlobalLockExecutor executor) throws Throwable {
boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
if (!alreadyInGlobalLock) {
//如果开启全局锁,会在threadLocal 放置一个标记 CONTEXT_HOLDER.put(KEY_GLOBAL_LOCK_FLAG, VALUE_GLOBAL_LOCK_FLAG);
RootContext.bindGlobalLockFlag();
}
// set my config to config holder so that it can be access in further execution
// for example, LockRetryController can access it with config holder
// 在上下文中保存旧GlobalLock的配置,使用当前GlobalLock的配置
GlobalLockConfig myConfig = executor.getGlobalLockConfig();
GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
try {
return executor.execute();
} finally {
// only unbind when this is the root caller.
// otherwise, the outer caller would lose global lock flag
if (!alreadyInGlobalLock) {
RootContext.unbindGlobalLockFlag();
}
// if previous config is not null, we need to set it back
// so that the outer logic can still use their config
if (previousConfig != null) {
// 恢复旧Globallock的配置
GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
} else {
// 业务逻辑执行后,上下文中移除当前Globallock的配置
GlobalLockConfigHolder.remove();
}
}
}
}
5)接下来处理业务方法时,对于 Seata AT 模式来说,其关注点在于 SQL 的执行环节,因为 AT 模式是代理数据源后做增强,即在处理 SQL 的环节做增强,对 CRUD 操作,提供了多种 xxxExecutor,如DeleteExecutor
、UpdateExecutor
,Seata 在这些 xxxExecutor 的基类方法BaseTransactionalExecutor#execute
方法内从ThreadLocal
中获取需要全局锁的标记,传递给ConnectionProxy
,当然如果有标记才做全局行锁的判断处理
@Override
public T execute(Object... args) throws Throwable {
String xid = RootContext.getXID();
if (xid != null) {
statementProxy.getConnectionProxy().bind(xid);
}
// 从上下文中获取是否需要全局锁的标记,传递给ConnectionProxy
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
// 处理sql
return doExecute(args);
}
6)在上一步提到的 xxxExecutor 有好几种,处理不同类型的 SQL 的逻辑也比较复杂,对全局锁的判断这种逻辑属于公共逻辑,所以Seata
的设计是统一在本地事务执行commit
的前一步(前后镜像构建之后)才做全局事务锁的处理(这样可以减少持锁时间),即ConnectionProxy#commit
内的doCommit
中,其内部因上下文不同有两种后续分支,一种是全局事务提交,一种是 globallock 检测+本地事务提交。
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
// 处理全局事务
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
//申请到全局锁后执行本地提交
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
7) globallock 检测+本地事务提交这种情况的处理在代码processLocalCommitWithGlobalLocks()
中,其中的逻辑是用ConnectionProxy#checkLock
判断有冲突的全局行锁是否存在(本事务所操作的行记录构造成行锁记录是否跟在 TC 侧已存在的全局行锁记录有重复),不存在的情况下通过targetConnection.commit()
提交本地事务;这个过程中并不会在 TC 中添加全局行锁记录。
private void processLocalCommitWithGlobalLocks() throws SQLException {
// 询问TC是否有锁冲突,若有会抛出异常,不执行下边的commit();
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}
8)如果ConnectionProxy#checkLock
检测到全局行锁记录已存在的话,会抛出LockConflictException
public void checkLock(String lockKeys) throws SQLException {
if (StringUtils.isBlank(lockKeys)) {
return;
}
// Just check lock without requiring lock by now.
try {
// 请TC发送RPC请求,查询 lockKeys 在TC侧是否已存在
boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
if (!lockable) {
// lockKeys 已在TC侧所在的话,则是锁冲突,抛出LockConflictException异常
// ConnectionProxy.LockRetryPolicy.doRetryOnLockConflict()捕获此异常做重试管理
throw new LockConflictException(String.format("get lock failed, lockKey: %s",lockKeys));
}
} catch (TransactionException e) {
//lockQuery()中并未抛出异常,谁来抛出 TransactionException 呢?
recognizeLockKeyConflictException(e, lockKeys);
}
}
这里有个疑问,AT 模式下才有lockQuery
动作,在 TC 端对lockQuery
的具体实现在AbstractLockManager#isLockable()
中,但其中并没有抛出异常,所以上边的recognizeLockKeyConflictException
什么情况使用呢?
public boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException {
if (StringUtils.isBlank(lockKey)) {
// no lock
return true;
}
List<RowLock> locks = collectRowLocks(lockKey, resourceId, xid);
try {
return getLocker().isLockable(locks);
} catch (Exception t) {
LOGGER.error("isLockable error, xid:{} resourceId:{}, lockKey:{}", xid, resourceId, lockKey, t);
return false;
}
}
9)方法外部捕获这个异常后重试,源码中实际有两处重试策略,一种是AbstractDMLBaseExecutor#executeAutoCommitTrue
内,另一种是在 在ConnectionProxy#commit
中;两者根据上下文条件不同只有一处生效,简单来理解:
-
如果服务调用被 Spring 事务包括,那么 Spring 事务会将
AutoCommit
设置的 false,那么重试逻辑发生在ConnectionProxy#commit
中 -
如果服务调用没有被 Spring 事务包括,那么通常来说
AutoCommit
的值就是 true,那么重试逻辑发生在AbstractDMLBaseExecutor#executeAutoCommitTrue
中
对于第一种情况来说ConnectionProxy#commit
中的ConnectionProxy#doCommit
外层的重试管理逻辑lockRetryPolicy.execute(() -> {doCommit();...})
@Override
public void commit() throws SQLException {
...
// 重试管控
lockRetryPolicy.execute(() -> {
// doCommit()方法传递给 doRetryOnLockConflict
doCommit();
return null;
});
...
}
上述方法本质是将doCommit()
方法传递给方法 doRetryOnLockConflict
,其内部通过循环+sleep 的方式完成重试。除了重试管控的逻辑,尤其需要注意,在冲突的情况下,onException
方法中会有回滚操作,当重试执行
protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
LockRetryController lockRetryController = new LockRetryController();
// 循环
while (true) {
try {
return callable.call();
} catch (LockConflictException lockConflict) {
// 冲突的情况下,执行本地rollback();
onException(lockConflict);
// AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released
if (connection.getContext().isAutoCommitChanged()
&& lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
}
// sleep方法里 重试 和 控制间隔;
// 超过次数抛出异常,退出循环
lockRetryController.sleep(lockConflict);
} catch (Exception e) {
onException(e);
throw e;
}
}
}
在 LockRetryController#sleep
方法中控制 重试次数和 重试间隔,超过次数抛出异常,退出循环。
public void sleep(Exception e) throws LockWaitTimeoutException {
// prioritize the rollback of other transactions
// 重试次数控制
if (--lockRetryTimes < 0 || (e instanceof LockConflictException
&& ((LockConflictException)e).getCode() == TransactionExceptionCode.LockKeyConflictFailFast)) {
throw new LockWaitTimeoutException("Global lock wait timeout", e);
}
try {
// 通过sleep控制重试间隔
Thread.sleep(lockRetryInterval);
} catch (InterruptedException ignore) {
}
}
另外一种情况,即服务调用没有被 Spring 事务包括,那么通常来说AutoCommit
的值就是 true,那么重试逻辑发生在AbstractDMLBaseExecutor#executeAutoCommitTrue
中,虽然内部也会调用ConnectionProxy#commit
,但ConnectionProxy#commit
内的重试逻辑不会被执行。另外区别之处在于重试内的逻辑还多了业务 sql 的执行以及前后镜像的构建,即下方注释中的 2.1 环节
/* 前提 :如果有Spring事务开启,将AutoCommit设置的false,则不执行这个方法
功能概述:
1. 执行此方法时, Seata 框架将AutoCommit设置的false,
目的是 2.1 和 2.2 两个步骤中的所有本地sql同时提交,简单理解就是 业务sql 和 Seata框架的undo_log一起提交。
2. 提交过程可能遇到锁冲突,在遇到锁冲突时,会有重试策略,重试逻辑中有2个逻辑主体:
2.1 .业务sql的执行(构造前后镜像)
2.2 .commit(此时,其内部的重试策略无效),下述逻辑根据上下文是三选一
2.2.1 processGlobalTransactionCommit();
执行分支事务的提交,向TC申请行锁,锁冲突则进入重试逻辑
不冲突执行注册分支事务,提交本地事务,向TC上报结果
2.2.2 processLocalCommitWithGlobalLocks();
申请到全局锁后执行本地提交,这种情况下还需要构造前后镜像嘛?
2.2.3 targetConnection.commit();
直接提交本地事务
*/
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
T result = executeAutoCommitFalse(args);
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
至此,@GlobalLock 的核心源码已领读完毕。
最后
我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。