Seata AT模式的一些常见问题及其源码解析
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
Seata AT
基于两阶段提交协议的演变:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。
AT 模式是一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。
通过日志观察客户端的全局事务处理流程
当微服务B的方法开启了本地事务,在微服务中先后执行了 insert a , update b , 业务逻辑c ,update d。微服务A的方法通过声明式注解@GlobalTransactional开启全局事务,然后通过feign调用微服务B的方法。
微服务A
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [172.17.0.7:8091:72680625000142901]
[ XNIO-1 task-4] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142905, lockKeys:tableName:pkid;tableName:pkid,pkid
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : transaction 172.17.0.8:8091:72680625000142901 will be commit
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : [172.17.0.8:8091:72680625000142901] commit status: Committed
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : transaction end, xid = 172.17.0.8:8091:72680625000142901[_RMROLE_1_26_32] io.seata.rm.AbstractRMHandler : Branch committing: 172.17.0.8:8091:72680625000142901 72680625000142905 jdbc:postgresql://ip:port/schema {"autoCommit":false}
[_RMROLE_1_26_32] i.s.c.r.p.c.RmBranchCommitProcessor : rm client handle branch commit process:BranchCommitRequest{xid='172.17.0.8:8091:72680625000142901', branchId=72680625000142905, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema?currentSchema=schema', applicationData='{"autoCommit":false}'}微服务B
[ XNIO-1 task-7] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142907, lockKeys:tableName:pkid
注意观察分支事务提交时线程的不同,证明是tc异步通知分支事务异步提交的。
1.分支事务内遇到异常,但全局事务不回滚?
当微服务B的方法开启了本地事务,在微服务中先后执行了 insert a , update b , 业务逻辑c ,update d。当某段逻辑(如业务c)执行中出现异常时,本地事务会触发回滚。(视rollbackFor、隔离级别、事务代理、事务上下文等诸多原因决定是否真的会回滚。)
此时,假设微服务A的方法通过声明式注解@GlobalTransactional开启全局事务,然后通过feign调用微服务B的方法,同样的当业务c执行中出现异常时,全局事务却未必回滚。
异常时全局事务处理流程
通过日志观察,分支事务遇到异常后,并没有提交到tc。即tc不会感知到分支事务的异常。
而微服务A没有输出微服务B的异常,正常提交了全局事务。
为什么微服务A没有捕获到微服务B的异常呢?是由于微服务B中配置的RestControllerAdvice捕获了svc的异常。
如下
@ExceptionHandler({BizException.class})
@ResponseBody
@ResponseStatus(HttpStatus.OK)
public Response handleGlobalException(Exception e) {String serviceName = this.environment.getProperty("spring.application.name");String errorMsg = String.format("系统异常,[%s],ex=%s", serviceName, e.getMessage());log.error(errorMsg, e);return Response.failed(e.getMessage());
}
ResponseStatus为200,feign感知不到异常不会报错,微服务A也就无从感知到异常。
调整如下:
@ExceptionHandler({BizException.class})
@ResponseBody
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Response handleGlobalException(Exception e) {String serviceName = this.environment.getProperty("spring.application.name");String errorMsg = String.format("系统异常,[%s],ex=%s", serviceName, e.getMessage());log.error(errorMsg, e);return Response.failed(e.getMessage());
}
微服务A的报错如下
feign.FeignException$InternalServerError: [500 Internal Server Error] during [POST] to [http://serviceName/url] [Client#method(List)]: [{"code":xxxx,"msg":"插入重复数据","data":null,"success":false}]at feign.FeignException.serverErrorStatus(FeignException.java:259)at feign.FeignException.errorStatus(FeignException.java:206)at feign.FeignException.errorStatus(FeignException.java:194)at feign.codec.ErrorDecoder$Default.decode(ErrorDecoder.java:103)at feign.InvocationContext.decodeError(InvocationContext.java:126)at feign.InvocationContext.proceed(InvocationContext.java:72)at feign.ResponseHandler.handleResponse(ResponseHandler.java:63)at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:114)at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:70)at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:99)at org.springframework.cloud.openfeign.FeignCachingInvocationHandlerFactory$1.proceed(FeignCachingInvocationHandlerFactory.java:66)at org.springframework.cache.interceptor.CacheInterceptor.lambda$invoke$0(CacheInterceptor.java:64)at org.springframework.cache.interceptor.CacheAspectSupport.invokeOperation(CacheAspectSupport.java:416)at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:401)at org.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:74)at org.springframework.cloud.openfeign.FeignCachingInvocationHandlerFactory.lambda$create$1(FeignCachingInvocationHandlerFactory.java:53)
feign.InvocationContext#proceed

除了调整httpStatus200,也可以通过在微服务A中判断返回对象的状态来决定是否报错,并由全局事务捕获。
异常时的处理逻辑:
全局事务回滚:
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : transaction 172.17.0.7:8091:72652747696657983 will be rollback
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : transaction end, xid = 172.17.0.7:8091:72652747696657983
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : [172.17.0.7:8091:72652747696657983] rollback status: Rollbacked分支事务回滚:
[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.8:8091:72652747696657983', branchId=72652747696657987, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.17.0.8:8091:72652747696657983 72652747696657987 jdbc:postgresql://ip:port/schema[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.17.0.8:8091:72652747696657983 branch 72652747696657987, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager : branch rollback success, xid:172.17.0.8:8091:72652747696657983, branchId:72652747696657987
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
注意:回滚也是异步执行的。
源码分析
全局事务的处理逻辑
io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction
io.seata.tm.api.TransactionalTemplate#execute// 1. Get transactionInfo 获取注解上的配置TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.GlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation. 隔离级别Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED:// If transaction is existing, suspend it.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend(false);}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// 当前事务存在,则先挂起;然后创建新事务;// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend(false);}tx = GlobalTransactionContext.createNew();// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,else create// 同spring @transactional 隔离级别,默认特性;当前事务存在直接返回,没有则创建;tx = GlobalTransactionContext.getCurrentOrCreate();break;case NEVER:// If transaction is existing, throw exception.if (existingTransaction(tx)) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if (notExistingTransaction(tx)) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}// Continue and execute with current transaction.break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}// 全局锁配置// set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Participant) {LOGGER.info("join into a existing global transaction,xid={}", tx.getXid());}try {// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,// else do nothing. Of course, the hooks will still be triggered.// 开启新事务beginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 处理业务代码,上文的场景即在这里处理;rs = business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 捕获异常并完成事务;// 注意是完成,而不是回滚,例如不是所有的异常都要回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.commitTransaction(tx, txInfo);return rs;} finally {//5. clearresumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)throws TransactionalExecutor.ExecutionException, TransactionException {//roll back 判断是回滚还是提交if (txInfo != null && txInfo.rollbackOn(originalException)) {rollbackTransaction(tx, originalException);} else {// not roll back on this exception, so commitcommitTransaction(tx, txInfo);}
}
分支事务的处理逻辑
提交阶段
io.seata.rm.datasource.ConnectionProxy#commit
io.seata.rm.datasource.ConnectionProxy#doCommit
io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit
io.seata.rm.AbstractResourceManager#branchRegister
BranchRegisterRequest request = new BranchRegisterRequest();
request.setXid(xid);
request.setLockKey(lockKeys);
request.setResourceId(resourceId);
request.setBranchType(branchType);
request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(),String.format("branch register failed, xid: %s, errMsg: %s ", xid, response.getMsg()));
}
if (LOGGER.isInfoEnabled()) {LOGGER.info("branch register success, xid:{}, branchId:{}, lockKeys:{}", xid, response.getBranchId(), lockKeys);
}
return response.getBranchId();
回滚阶段,tm触发全局回滚后,tc会通过netty异步告知各rm进行回滚
io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler
io.seata.core.rpc.netty.AbstractNettyRemoting#processMessage
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#process // rm回滚
io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#handleBranchRollback
io.seata.rm.AbstractRMHandler#doBranchRollback
io.seata.rm.datasource.DataSourceManager#branchRollback
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));}try {// 基于undo_log回滚UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);if (LOGGER.isInfoEnabled()) {LOGGER.info("branch rollback success, xid:{}, branchId:{}", xid, branchId);}} catch (TransactionException te) {StackTraceLogger.error(LOGGER, te,"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;} else {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}}return BranchStatus.PhaseTwo_Rollbacked;
}
2.传播特性Propagation
spring @transactional 传播特性失效?
Spring在TransactionDefinition接口中规定了7种类型的事务传播行为。REQUIRES_NEW 用于在当前事务存在时挂起当前事务,并创建一个新的事务来执行标注了REQUIRES_NEW的方法。这种机制确保了内部事务的独立性,不受外部事务的影响。
然而在全局事务回滚时,REQUIRES_NEW注解的事务也将回滚,因为新开启的新分支事务也被同一全局事务管理。
日志如下:
[ XNIO-1 task-2] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615138, lockKeys:tableName:pkName
-- REQUIRES_NEW 新开启的分支事务
[ XNIO-1 task-2] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615141, lockKeys:tableName:pkName[h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.7:8091:63655768332615130', branchId=63655768332615141, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='{"autoCommit":false}'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.17.0.7:8091:63655768332615130 63655768332615141 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.17.0.7:8091:63655768332615130 branch 63655768332615141, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager : branch rollback success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615141
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked[h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.7:8091:63655768332615130', branchId=63655768332615138, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='{"autoCommit":false}'}
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.17.0.7:8091:63655768332615130 63655768332615138 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_2_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.17.0.7:8091:63655768332615130 branch 63655768332615138, undo_log deleted with GlobalFinished
[h_RMROLE_1_2_16] i.seata.rm.datasource.DataSourceManager : branch rollback success, xid:172.17.0.7:8091:63655768332615130, branchId:63655768332615138
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
解决方式,在spring REQUIRES_NEW 之上添加全局事务的传播特性,开启新的全局事务
@GlobalTransactional(rollbackFor = Exception.class, propagation = io.seata.tm.api.transaction.Propagation.REQUIRES_NEW)
@Transactional(rollbackFor = Exception.class, propagation = org.springframework.transaction.annotation.Propagation.REQUIRES_NEW)
// 全局事务A回滚,另一个全局事务并不会回滚
[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : suspending current transaction, xid = 172.17.0.8:8091:72652747696657983[ XNIO-1 task-2] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [172.17.0.8:8091:72652747696657984][h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:BranchRollbackRequest{xid='172.17.0.8:8091:72652747696657983', branchId=72652747696657987, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 172.17.0.8:8091:72652747696657983 72652747696657987 jdbc:postgresql://ip:port/schema
[h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 172.17.0.8:8091:72652747696657983 branch 72652747696657987, undo_log deleted with GlobalFinished
[h_RMROLE_1_1_16] i.seata.rm.datasource.DataSourceManager : branch rollback success, xid:172.17.0.8:8091:72652747696657983, branchId:72652747696657987
[h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked[h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchCommitProcessor : rm client handle branch commit process:BranchCommitRequest{xid='172.17.0.8:8091:72652747696657984', branchId=72652747696657985, branchType=AT, resourceId='jdbc:postgresql://ip:port/schema', applicationData='null'}
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch committing: 172.17.0.8:8091:72652747696657984 72652747696657985 jdbc:postgresql://ip:port/schema null
[h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
再次看io.seata.tm.api.TransactionalTemplate#execute方法里全局事务传播特性的行为
case REQUIRES_NEW:// 区分传播特性// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder = tx.suspend(false);}tx = GlobalTransactionContext.createNew();
ShouldNeverHappenException
在最初使用全局事务的隔离级别时,方法上只有@GlobalTransactional(rollbackFor = Exception.class, propagation = io.seata.tm.api.transaction.Propagation.REQUIRES_NEW),而没有@Transactional(rollbackFor = Exception.class, propagation = org.springframework.transaction.annotation.Propagation.REQUIRES_NEW),抛出过如下异常,绑定的是新分布式事务id,但当前全局事务id是原id
Cause: java.sql.SQLException: io.seata.common.exception.ShouldNeverHappenException: bind xid: 172.17.0.7:8091:63655768332620147, while current xid: 172.17.0.7:8091:63655768332620141Caused by: io.seata.common.exception.ShouldNeverHappenException: bind xid: 172.17.0.7:8091:63655768332620147, while current xid: 172.17.0.7:8091:63655768332620141at io.seata.rm.datasource.ConnectionContext.bind(ConnectionContext.java:198)at io.seata.rm.datasource.ConnectionProxy.bind(ConnectionProxy.java:85)at io.seata.rm.datasource.exec.BaseTransactionalExecutor.execute(BaseTransactionalExecutor.java:120)at io.seata.rm.datasource.exec.ExecuteTemplate.execute(ExecuteTemplate.java:145)at io.seata.rm.datasource.PreparedStatementProxy.execute(PreparedStatementProxy.java:55)at org.apache.ibatis.executor.statement.PreparedStatementHandler.update(PreparedStatementHandler.java:48)at org.apache.ibatis.executor.statement.RoutingStatementHandler.update(RoutingStatementHandler.java:75)at org.apache.ibatis.executor.SimpleExecutor.doUpdate(SimpleExecutor.java:50)at org.apache.ibatis.executor.BaseExecutor.update(BaseExecutor.java:117)at org.apache.ibatis.executor.CachingExecutor.update(CachingExecutor.java:76)at com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor.intercept(MybatisPlusInterceptor.java:106)at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:197)
seata 通过代理执行sql语句时,检查是否已有xid
org.apache.seata.rm.datasource.ConnectionContext#bind
void bind(String xid) {if (xid == null) {throw new IllegalArgumentException("xid should not be null");}if (!inGlobalTransaction()) {setXid(xid);} else {if (!this.xid.equals(xid)) {throw new ShouldNeverHappenException(String.format("bind xid: %s, while current xid: %s", xid, this.xid));}}
}
跟进org.apache.seata.rm.datasource.ConnectionContext#reset()方法,只有在本地事务提交或回滚后才会清空xid。即同一分支事务不能跨不同的分布式事务。
在执行不同的sql语句时,会根据不同类型的数据库及dml语句来实现抽象类AbstractDMLBaseExecutor,比如insert、update、delete的beforeImage,afterImage有不同的处理逻辑。
3. 全局事务读未提交?
回滚时的undolog 检测
在某次分支事务执行回滚时,曾遇到过如下异常:
[_RMROLE_1_13_32] i.seata.rm.datasource.DataSourceManager : branchRollback failed. branchType:[AT], xid:[172.17.0.8:8091:72641030088531502], branchId:[72641030088531508], resourceId:[jdbc:postgresql://ip:port/schema], applicationData:[{"autoCommit":false}]. reason:[Branch session rollback failed because of dirty undo log, please delete the relevant undolog after manually calibrating the data. xid = 172.17.0.8:8091:72641030088531502 branchId = 72641030088531508]
// 执行回滚
org.apache.seata.rm.AbstractRMHandler#doBranchRollback
io.seata.rm.datasource.DataSourceManager#branchRollback
// 基于undo log进行回滚
org.apache.seata.rm.datasource.undo.AbstractUndoLogManager#undo
org.apache.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn
// 回滚前校验, 如校验当前信息与修改前、修改后信息
org.apache.seata.rm.datasource.undo.AbstractUndoExecutor#dataValidationAndGoOn// 如果当前数据与undo log 中的修改后、修改前日志都不一样,则抛出异常// 这些数据可能被其它并发事务修改了,需要人工维护io.seata.rm.datasource.undo.SQLUndoDirtyExceptionprotected boolean dataValidationAndGoOn(ConnectionProxy conn) throws SQLException {TableRecords beforeRecords = sqlUndoLog.getBeforeImage();TableRecords afterRecords = sqlUndoLog.getAfterImage();// Compare current data with before data// No need undo if the before data snapshot is equivalent to the after data snapshot.Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);if (beforeEqualsAfterResult.getResult()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Stop rollback because there is no data change " +"between the before data snapshot and the after data snapshot.");}// no need continue undo.return false;}// Validate if data is dirty.TableRecords currentRecords = queryCurrentRecords(conn);// compare with current data and after image.Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);if (!afterEqualsCurrentResult.getResult()) {// If current data is not equivalent to the after data, then compare the current data with the before// data, too. No need continue to undo if current data is equivalent to the before data snapshotResult<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);if (beforeEqualsCurrentResult.getResult()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Stop rollback because there is no data change " +"between the before data snapshot and the current data snapshot.");}// no need continue undo.return false;} else {if (LOGGER.isInfoEnabled()) {if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) {LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams());}}if (LOGGER.isDebugEnabled()) {LOGGER.debug("check dirty data failed, old and new data are not equal, " +"tableName:[" + sqlUndoLog.getTableName() + "]," +"oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +"newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");}throw new SQLUndoDirtyException("Has dirty records when undo.");}}return true;
}
回顾下seata at 模式的执行流程,在一阶段分支事务注册时已经完成本地事务的提交。
// 本地事务提交
org.apache.seata.rm.datasource.ConnectionProxy#commit
org.apache.seata.rm.datasource.ConnectionProxy#doCommit
org.apache.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit private void processGlobalTransactionCommit() throws SQLException {try {// 向tc注册分支事务 register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);// 提交本地事务targetConnection.commit();} catch (Throwable ex) {LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);report(false);throw new SQLException(ex);}if (IS_REPORT_SUCCESS_ENABLE) {// 向tc上报本地提交结果report(true);}// 重试context;避免检查xid抛出shouldNotHappenExpcontext.reset();
}
而回滚时是收到tc通知异步进行的,在这个时间差内如果有其他事务读到了全局事务未提交的数据,即出现了全局事务的读未提交,出现了脏读的情况。
如何实现全局事务的读已提交?
脏写
回顾上文的日志中,曾出现过
[ XNIO-1 task-4] io.seata.rm.AbstractResourceManager : branch register success, xid:172.17.0.8:8091:72680625000142901, branchId:72680625000142905, lockKeys:tableName:pkid;tableName:pkid,pkid
即分支事务提交时,会加锁。
但是如果针对同一张表的改动不在此全局事务的方法中,如何处理呢?
可以在其他方法上也增加全局事务注解@GlobalTransactional 。
如果不需要全局事务,则需要使用@GlobalLock。
无论是@GlobalTransactional还是@GlobalLock,都是在分支事务提交时才能知道锁是否存在,然后决定下一步操作。
脏读
曾有同事遇到过一个场景,在MQ因异常重试时,读到了回滚前的数据认为业务已执行完成而停止了重试逻辑。
因此需要在执行过程中有必要检查全局锁是否存在。
如果执行 SQL 是 select for update,则会使用 SelectForUpdateExecutor 类,如果执行方法中带有 @GlobalTransactional or @GlobalLock注解,则会检查是否有全局锁,如果当前存在全局锁,则会回滚本地事务,通过 while 循环不断地重新竞争获取本地锁和全局锁。
io.seata.rm.datasource.exec.SelectForUpdateExecutor#doExecute
public T doExecute(Object... args) throws Throwable {Connection conn = statementProxy.getConnection();// ... ...try {// ... ...while (true) {try {// ... ...if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {// Do the same thing under either @GlobalTransactional or @GlobalLock, // that only check the global lock here.statementProxy.getConnectionProxy().checkLock(lockKeys);} else {throw new RuntimeException("Unknown situation!");}break;} catch (LockConflictException lce) {if (sp != null) {conn.rollback(sp);} else {conn.rollback();}// trigger retrylockRetryController.sleep(lce);}}} finally {// ...}
其他的一些问题
时间类型的字段时区不要带时区
[_RMROLE_1_10_32] i.seata.rm.datasource.DataSourceManager : branchRollback failed. branchType:[AT], xid:[172.17.0.8:8091:72641030088531502], branchId:[72641030088531508], resourceId:[jdbc:postgresql://ip:port/schema], applicationData:[{"autoCommit":false}]. reason:[Branch session rollback failed and try again later xid = 172.17.0.8:8091:72641030088531502 branchId = 72641030088531508 Cannot cast an instance of java.sql.Timestamp to type Types.TIMESTAMP_WITH_TIMEZONE]
参考
https://seata.apache.org/zh-cn/docs/dev/mode/at-mode/
https://seata.apache.org/zh-cn/blog/seata-datasource-proxy/
https://seata.apache.org/zh-cn/docs/overview/faq/#42
https://seata.apache.org/zh-cn/blog/seata-at-lock/
org.apache.seata.tm.api.TransactionalTemplate
io.seata.rm.AbstractResourceManager
org.apache.seata.rm.datasource.exec.BaseTransactionalExecutor
子类 AbstractDMLBaseExecutor、SelectForUpdateExecutor
org.apache.seata.server.coordinator.DefaultCore
相关文章:
Seata AT模式的一些常见问题及其源码解析
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。 Seata AT 基于两阶段提交协议的演变: 一阶段:业…...
华为GaussDB数据库的手动备份与还原操作介绍
数据库的备份以A机上的操作为例。 1、使用linux的root用户登录到GaussDB服务器。 2、用以下命令切换到 GaussDB 管理员用户,其中,omm 为当前数据库的linux账号。 su - omm 3、执行gs_dump命令进行数据库备份: 这里使用gs_dump命令进行备…...
2025年3月29日(matlab -ss -lti)
线性时不变系统(LTI系统)的定义与核心特性 线性时不变系统(Linear Time-Invariant System)是信号与系统分析中的基础模型,其核心特性包括线性和时不变性。以下从定义、验证方法和应用场景展开说明: 1. 线性…...
网络原理-TCP/IP
网络原理学习笔记:TCP/IP 核心概念 本文是我在学习网络原理时整理的笔记,主要涵盖传输层、网络层和数据链路层的核心协议和概念,特别是 TCP, UDP, IP, 和以太网。 一、传输层 (Transport Layer) 传输层负责提供端到端(进程到进…...
服务器磁盘卷组缓存cache设置介绍
工具1: storcli a. 确认软件包是否安装 [rootlocalhost ~]#rpm -qa | grep storcli storcli-1.21.06-1.noarch 备注:若检索结果为空,需要安装对应的软件安装包。安装命令如下: #rpm -ivh storcli-xx-xx-1.noarch.rpm b. 查看逻辑…...
Unity顶点优化:UV Splits与Smoothing Splits消除技巧
一、顶点分裂问题概述 1. 什么是顶点分裂 顶点分裂(Vertex Splits)是3D渲染中常见的性能问题,当模型需要为同一顶点位置存储不同属性值时,会创建多个顶点副本。主要分为两类: UV Splits:由UV不连续引起 Smoothing Splits&#…...
第五十三章 Spring之假如让你来写Boot——环境篇
Spring源码阅读目录 第一部分——IOC篇 第一章 Spring之最熟悉的陌生人——IOC 第二章 Spring之假如让你来写IOC容器——加载资源篇 第三章 Spring之假如让你来写IOC容器——解析配置文件篇 第四章 Spring之假如让你来写IOC容器——XML配置文件篇 第五章 Spring之假如让你来写…...
Router [Continuation Settings]
楼上网络CMCC-Wmew,楼下接收不到,可能因为喜好弱,再弄一台路由器中转一下 Router [Continuation Settings] 路由器中续设置 到这里这台K3的路由器设置完成了,作为转发,中续,她还需要设置上游路由器&#…...
Zookeeper中的Zxid是如何设计的
想获取更多高质量的Java技术文章?欢迎访问Java技术小馆官网,持续更新优质内容,助力技术成长 Java技术小馆官网https://www.yuque.com/jtostring Zookeeper中的Zxid是如何设计的 如果你们之前学习过 ZooKeeper,你们可能已经了解…...
蓝桥云客 岛屿个数
0岛屿个数 - 蓝桥云课 问题描述 小蓝得到了一副大小为 MN 的格子地图,可以将其视作一个只包含字符 0(代表海水)和 1(代表陆地)的二维数组,地图之外可以视作全部是海水,每个岛屿由在上/下/左/右…...
深度学习篇---paddleocr正则化提取
文章目录 前言一、代码总述&介绍1.1导入必要的库1.1.1cv21.1.2re1.1.3paddleocr 1.2初始化PaddleOCR1.3打开摄像头1.4使用 PaddleOCR 进行识别1.5定义正则表达式模式1.6打印提取结果1.7异常处理 二、正则表达式2.1简介2.2常用正则表达式模式及原理2.2.1. 快递单号模式2.2.2…...
Android 蓝牙/Wi-Fi通信协议之:低功耗蓝牙(BLE 4.0+)介绍
介绍:蓝牙通信协议详解 1. 蓝牙协议分层 Android主要支持**经典蓝牙(Bluetooth Classic)和低功耗蓝牙(BLE)**两种模式: 经典蓝牙(BT 2.1/3.0) 低功耗蓝牙(BLE 4.0&…...
流影---开源网络流量分析平台(四)(分析引擎部署)
目录 功能介绍 部署过程 一、安装依赖环境 二、源码编译部署 三、运行环境配置 四、运行配置 功能介绍 本章我将继续安装流影的分析引擎组件首先,ly_analyser是流影的威胁行为分析引擎,读取netflow v9格式的数据作为输入,运行各种威胁行…...
31天Python入门——第14天:异常处理
你好,我是安然无虞。 文章目录 异常处理1. Python异常2. 异常捕获try-except语句捕获所有的异常信息获取异常对象finally块 3. raise语句4. 自定义异常5. 函数调用里面产生的异常补充练习 异常处理 1. Python异常 Python异常指的是在程序执行过程中发生的错误或异…...
浅析Android Jetpack ACC之LiveData
一、Android Jetpack简介 Android官网对Jetpack的介绍如下: Jetpack is a suite of libraries to help developers follow best practices, reduce boilerplate code, and write code that works consistently across Android versions and devices so that develo…...
【区块链安全 | 第十五篇】类型之值类型(二)
文章目录 值类型有理数和整数字面量(Rational and Integer Literals)字符串字面量和类型(String Literals and Types)Unicode 字面量(Unicode Literals)十六进制字面量(Hexadecimal Literals&am…...
深度学习篇---模型训练评估参数
文章目录 前言一、Precision(精确率)1.1定义1.2意义1.3数值接近11.4数值再0.5左右1.5数值接近0 二、Recall(召回率)2.1定义2.2意义2.3数值接近12.4数值在0.5左右2.5数值接近0 三、Accuracy(准确率)3.1定义3…...
SQL Server 可用性组自动种子设定失败问题
目录标题 SQL Server 可用性组自动种子设定失败问题笔记一、问题背景二、错误日志分析错误信息错误代码与分析 三、自动种子设定概述(同上,无需修改)四、解决步骤1. 备份主数据库2. 在辅助副本上恢复数据库3. 重新启动自动种子设定 SQL Serve…...
02 相机标定相关坐标系
标定相关坐标系 一共四个坐标系 图像像素坐标系: u-v,图像左上角为原点图像物理坐标系: x-y,图像中心为原点...
Ubuntu修改用户名
修改用户名: 1.CTRL ALT T 快捷键打开终端,输入‘sudo su’ 转为root用户。 2.输入‘ gredit /etc/passwd ’,修改用户名,只修改用户名,后面的全名、目录等不修改。 3.输入 ‘ gedit /etc/shadow ’ 和 ‘ gedit /etc/…...
Windows 系统下多功能免费 PDF 编辑工具详解
IceCream PDF Editor是一款极为实用且操作简便的PDF文件编辑工具,它完美适配Windows操作系统。其用户界面设计得十分直观,哪怕是初次接触的用户也能快速上手。更为重要的是,该软件具备丰富多样的强大功能,能全方位满足各类PDF编辑…...
UE学习记录part11
第14节 breakable actors 147 destructible meshes a geometry collection is basically a set of static meshes that we get after we fracture a mesh. 几何体集合基本上是我们在断开网格后获得的一组静态网格。 选中要破碎的网格物品,创建集合 可以选择不同的…...
Redis-07.Redis常用命令-集合操作命令
一.集合操作命令 SADD key member1 [member2]: sadd set1 a b c d sadd set1 a 0表示没有添加成功,因为集合中已经有了这个元素了,因此无法重复添加。 SMEMBERS key: smembers set1 SCARD key: scard set1 SADD key member1 …...
vscode 源代码管理
https://code.visualstudio.com/updates/v1_92#_source-control 您可以通过切换 scm.showHistoryGraph 设置来禁用传入/传出更改的图形可视化。...
arm64位FFmpeg与X264库
参考链接: https://blog.csdn.net/gitblog_09700/article/details/142945092...
iOS审核被拒:Missing privacy manifest 第三方库添加隐私声明文件
问题: iOS提交APP审核被拒,苹果开发者网页显示二进制错误,收到的邮件显示的详细信息如下图: 分析: 从上面信息能看出第三方SDK库必须要包含一个隐私文件,去第三方库更新版本。 几经查询资料得知,苹果在…...
用mkdocs写文档#自动更新github-page
https://wuyisheng.github.io/blog 背景是上一篇博客 使用mkdocs,最后提及可以部署github page。这里说明下怎么自动部署。 当然,这篇博客主要的目的还是提供下github page的链接 :) 我是这样做的: step 1: pip3 i…...
【LeetCode Solutions】LeetCode 101 ~ 105 题解
CONTENTS LeetCode 101. 对称二叉树(简单)LeetCode 102. 二叉树的层序遍历(中等)LeetCode 103. 二叉树的锯齿形层序遍历(中等)LeetCode 104. 二叉树的最大深度(简单)LeetCode 105. 从…...
Orpheus-TTS 介绍,新一代开源文本转语音
Orpheus-TTS 是由 Canopy Labs 团队于2025年3月19日发布的开源文本转语音(TTS)模型,其技术突破集中在超低延迟、拟人化情感表达与实时流式生成三大领域。以下从技术架构、核心优势、应用场景、对比分析、开发背景及最新进展等多维度展开深入解…...
Java数据结构-栈和队列
目录 1. 栈(Stack) 1.1 概念 1.2 栈的使用 1.3 栈的模拟实现 1.4 栈的应用场景 1. 改变元素的序列 2. 将递归转化为循环 3. 括号匹配 4. 逆波兰表达式求值 5. 出栈入栈次序匹配 6. 最小栈 1.5 概念区分 2. 队列(Queue) 2.1 概念 2.2 队列的使用 2.3 队列模拟实…...
