当前位置: 首页 > news >正文

Seata源码——TCC模式解析02

初始化

在SpringBoot启动的时候通过自动注入机制将GlobalTransactionScanner注入进ioc而GlobalTransactionScanner继承AbstractAutoProxyCreatorAbstract 在postProcessAfterInitialization阶段由子类创建代理TccActionInterceptor

GlobalTransactionScanner

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// ...// 注册RM、判断是否需要代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // tcc_fence_log清理任务TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);// 代理逻辑TccActionInterceptorinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); }// ...
}

TCC下的Bean类型

TCC模式下有三种特殊的SpringBean。
1.LocalTCC注释接口的Bean:如案例中的LocalTccAction;
2.RPC服务提供方ServiceBean:如Dubbo中被@DubboService注释的服务实现类,如案例中的StorageTccActionImpl;
3.RPC服务消费方ReferenceBean:如Dubbo中被@DubboReference注入的Bean,如案例中的StorageTccAction;
在这里插入图片描述

判断是否需要代理

TCCBeanParserUtils

public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {// dubbo:service 和 LocalTCC 注册为 RMboolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);if (isRemotingBean) {if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {// LocalTCC 需要被代理 TccActionInterceptorreturn isTccProxyTargetBean(remotingDesc); // 1} else {// dubbo:service(ServiceBean) 不需要被代理return false; // 2}} else {if (remotingDesc == null) {if (isRemotingFactoryBean(bean, beanName, applicationContext)) {// dubbo:reference(Dubbo ReferenceBean) 需要被代理 TccActionInterceptorremotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);return isTccProxyTargetBean(remotingDesc); // 3} else {return false;}} else {return isTccProxyTargetBean(remotingDesc);}}
}

isTccProxyTargetBean判断LocalTCC和ReferenceBean具体是否会被代理,只有接口里有TwoPhaseBusinessAction注解方法的类,才会返回true,被TccActionInterceptor拦截。

public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {if (remotingDesc == null) {return false;}boolean isTccClazz = false;Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();Method[] methods = tccInterfaceClazz.getMethods();TwoPhaseBusinessAction twoPhaseBusinessAction;for (Method method : methods) {twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (twoPhaseBusinessAction != null) {isTccClazz = true;break;}}if (!isTccClazz) {return false;}short protocols = remotingDesc.getProtocol();if (Protocols.IN_JVM == protocols) {return true; // local tcc}return remotingDesc.isReference(); // dubbo:reference
}

注册为RM

识别所有LocalTCC和ServiceBean中被TwoPhaseBusinessAction注解标注的方法,每个TwoPhaseBusinessAction注解的方法都作为一个TCCResource注册到TC。

TCCBeanParserUtils

    protected static boolean parserRemotingServiceInfo(Object bean, String beanName) {RemotingParser remotingParser = DefaultRemotingParser.get().isRemoting(bean, beanName);if (remotingParser != null) {return DefaultRemotingParser.get().parserRemotingServiceInfo(bean, beanName, remotingParser) != null;}return false;}

DefaultRemotingParser

public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);if (remotingBeanDesc == null) {return null;}remotingServiceMap.put(beanName, remotingBeanDesc);Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();Method[] methods = interfaceClass.getMethods();if (remotingParser.isService(bean, beanName)) {// localTcc or ServiceBeantry {Object targetBean = remotingBeanDesc.getTargetBean();for (Method m : methods) {TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);// 所有TwoPhaseBusinessAction注解标注的方法注册为一个Resourceif (twoPhaseBusinessAction != null) {TCCResource tccResource = new TCCResource();tccResource.setActionName(twoPhaseBusinessAction.name());tccResource.setTargetBean(targetBean);tccResource.setPrepareMethod(m);tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses());tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses());tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(),twoPhaseBusinessAction.commitArgsClasses()));tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(),twoPhaseBusinessAction.rollbackArgsClasses()));//注册到TCDefaultResourceManager.get().registerResource(tccResource);}}} catch (Throwable t) {throw new FrameworkException(t, "parser remoting service error");}}if (remotingParser.isReference(bean, beanName)) {remotingBeanDesc.setReference(true);}return remotingBeanDesc;
}

一阶段(Try)

TccActionInterceptor会拦截所有标注了TwoPhaseBusinessAction注解的方法执行invoke方法执行一阶段处理

一阶段其实就做了三件事
1.创建BusinessContext
2.将BusinessContext添加到上下文中让后续的Commit和Rollback能拿到数据
3.创建分支事务

// TccActionInterceptor
private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler();
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {return invocation.proceed();}Method method = getActionInterfaceMethod(invocation);TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);if (businessAction != null) {String xid = RootContext.getXID();BranchType previousBranchType = RootContext.getBranchType();if (BranchType.TCC != previousBranchType) {RootContext.bindBranchType(BranchType.TCC);}try {return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction,invocation::proceed);} finally {if (BranchType.TCC != previousBranchType) {RootContext.unbindBranchType();}MDC.remove(RootContext.MDC_KEY_BRANCH_ID);}}return invocation.proceed();
}

ActionInterceptorHandler

    public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,Callback<Object> targetCallback) throws Throwable {Map<String, Object> ret = new HashMap<>(4);//TCC nameString actionName = businessAction.name();//创建BusinessActionContext BusinessActionContext actionContext = new BusinessActionContext();//设置全局事务idactionContext.setXid(xid);//设置事务唯一名称 这里是从@TwoPhaseBusinessAction注解里面的name拿过来的actionContext.setActionName(actionName);//注册分支事务String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);//设置分支事务idactionContext.setBranchId(branchId);//MDC put branchIdMDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);//设置BusinessActionContext属性信息Class<?>[] types = method.getParameterTypes();int argIndex = 0;for (Class<?> cls : types) {if (cls.getName().equals(BusinessActionContext.class.getName())) {arguments[argIndex] = actionContext;break;}argIndex++;}//the final parameters of the try methodret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);//执行业务方法,即try方法ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());return ret;}

BusinessActionContext 信息

public class BusinessActionContext implements Serializable {private static final long serialVersionUID = 6539226288677737991L;// 全局事务idprivate String xid;// 分支事务idprivate String branchId;// @TwoPhaseBusinessAction.nameprivate String actionName;// actionContextprivate Map<String, Object> actionContext;} 

actionContext存储了包括:try方法名(sys::prepare)、commit方法名(sys::commit)、rollback方法名(sys::rollback)、actionName(@TwoPhaseBusinessAction.name)、是否开启tccFence(@TwoPhaseBusinessAction.useTCCFence)、参数名称和参数值。

注册分支事务

    protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,BusinessActionContext actionContext) {String actionName = actionContext.getActionName();String xid = actionContext.getXid();//获取actionContext信息Map<String, Object> context = fetchActionRequestContext(method, arguments);context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());//初始化 BusinessContextinitBusinessContext(context, method, businessAction);//初始化上下文initFrameworkContext(context);//设置上下文信息actionContext.setActionContext(context);//init applicationDataMap<String, Object> applicationContext = new HashMap<>(4);applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);String applicationContextStr = JSON.toJSONString(applicationContext);try {//注册分支事务Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null);return String.valueOf(branchId);} catch (Throwable t) {String msg = String.format("TCC branch Register error, xid: %s", xid);LOGGER.error(msg, t);throw new FrameworkException(t, msg);}}

初始化BusinessContext

将TwoPhaseBusinessAction 注解的参数放入上下文中

    protected void initBusinessContext(Map<String, Object> context, Method method,TwoPhaseBusinessAction businessAction) {if (method != null) {//the phase one method namecontext.put(Constants.PREPARE_METHOD, method.getName());}if (businessAction != null) {//the phase two method namecontext.put(Constants.COMMIT_METHOD, businessAction.commitMethod());context.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod());context.put(Constants.ACTION_NAME, businessAction.name());}}

初始化上下文

将本地IP放入上下文中

    protected void initFrameworkContext(Map<String, Object> context) {try {context.put(Constants.HOST_NAME, NetUtil.getLocalIp());} catch (Throwable t) {LOGGER.warn("getLocalIP error", t);}}

注册分支事务

RM进行分支事务的注册

RM进行分支事务的注册
AbstractResourceManager

    @Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);request.setBranchType(branchType);request.setApplicationData(applicationData);BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));}return response.getBranchId();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);}}
TC处理分支事务注册请求

AbstractCore

public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,String applicationData, String lockKeys) throws TransactionException {// Step1 根据xid查询global_table得到GlobalSessionGlobalSession globalSession = assertGlobalSessionNotNull(xid, false);// 对于存储模式=file的情况,由于GlobalSession在内存中,所以需要获取锁后再执行// 对于存储模式=db/redis的情况,不需要获取锁return SessionHolder.lockAndExecute(globalSession, () -> {// 状态校验 必须为beginglobalSessionStatusCheck(globalSession);globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,applicationData, lockKeys, clientId);MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));// Step2 获取全局锁(只有AT模式需要)branchSessionLock(globalSession, branchSession);try {// Step3 保存分支事务globalSession.addBranch(branchSession);} catch (RuntimeException ex) {branchSessionUnlock(branchSession);throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),branchSession.getBranchId()), ex);}return branchSession.getBranchId();});
}

二阶段Commit

TM发起全局事务提交

当Try处理完之后 TM发起GlobalCommitRequest给TC,TC负责执行每个分支事务提交 这里在AT模式里面讲过 不知道的回看

TC处理二阶段提交

public GlobalStatus commit(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// 如果分支事务存在AT模式,先释放全局锁,delete from lock_table where xid = ?globalSession.closeAndClean();// 如果分支事务都是AT模式,则可以执行异步提交if (globalSession.canBeCommittedAsync()) {// 执行异步提交,更新全局事务状态为AsyncCommitting,update global_table set status = AsyncCommitting where xid = ?globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {// TCCglobalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) { // 同步提交(TCC)boolean success = doGlobalCommit(globalSession, false);if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else { // 异步提交(AT)return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}
}

1.执行全局事务提交核心逻辑,如果二阶段提交失败,会重试至成功为止。
2.因为这个方法也是AT模式用的 所以假如都是AT模式会被异步调用最后是释放锁 删除分支事务 删除undo_log 删除全局事务 自此提交结束 这个在前面讲过
3.假如是AT和TCC混合使用AT模式的分支事务会在异步任务中再次执行doGlobalCommit异步提交,TCC模式的分支事务还是会在第一次调用doGlobalCommit时同步提交,如果中间存在分支事务提交失败,会异步重试直至成功。

DefaultCore

public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// AT模式和TCC模式共存的情况下,AT模式跳过同步提交,只对TCC模式分支事务同步提交if (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}try {// Step1 发送BranchCommitRequest给RM,AT模式RM会删除undo_log,TCC模式RM执行二阶段提交BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Committed:// Step2 删除branch_table中的分支事务记录SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable: // 不可重试(XA中有实现)return false;default:if (!retrying) {// 更新全局事务为二阶段提交重试状态,异步重试至成功位置globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {return CONTINUE;} else {return false;}}} catch (Exception ex) {if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}// 某个分支事务处理失败,继续处理后续分支事务return CONTINUE;});// 如果是同步提交,某个分支事务处理失败,直接返回falseif (result != null) {return result;}if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {return false;}if (!retrying) {globalSession.setStatus(GlobalStatus.Committed);}}if (success && globalSession.getBranchSessions().isEmpty()) {// Step3 删除全局事务 delete from global_table where xid = ?SessionHelper.endCommitted(globalSession, retrying);}return success;
}

向RM发送分支事务提交请求

AbstractCore

protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,BranchSession branchSession) throws IOException, TimeoutException {BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);return response.getBranchStatus();
}// AbstractNettyRemotingServer
public Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException {// 定位客户端ChannelChannel channel = ChannelManager.getChannel(resourceId, clientId);if (channel == null) {throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);}RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());

获取客户端Channel

对于LocalTCC或者AT模式,分支事务注册与提交是同一个服务实例,通过resourceId+applicationId+ip+port一般就能定位到二阶段通讯的服务实例,但是可能对应服务宕机或者宕机后重连,这边会降级去找同一个ip不同port的,或者同一个applicationId的不同ip:port。
对于TCC模式下二阶段要找ServiceBean服务提供方的情况,直接进入Step2-fallback,找同一个resourceId下的其他applicationId注册的RM,这里就能找到storage-service进行二阶段提交,所以resourceId(actionName)最好全局唯一。

ChannelManager

public static Channel getChannel(String resourceId, String clientId) {Channel resultChannel = null;String[] clientIdInfo = readClientId(clientId);String targetApplicationId = clientIdInfo[0];String targetIP = clientIdInfo[1];int targetPort = Integer.parseInt(clientIdInfo[2]);ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);// Step1 根据resourceId找对应applicationId-ip-port对应channelif (targetApplicationId == null || applicationIdMap == null ||  applicationIdMap.isEmpty()) {return null;}ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);// Step2 根据BranchSession注册的applicationId应用if (ipMap != null && !ipMap.isEmpty()) {// Step3 根据BranchSession注册的ipConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {// Step4 根据BranchSession注册的portRpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);if (exactRpcContext != null) {Channel channel = exactRpcContext.getChannel();if (channel.isActive()) {resultChannel = channel;}}// Step4-fallback 可能原始channel关闭了,遍历BranchSession注册的ip对应的其他port(resourceId+applicationId+ip)if (resultChannel == null) {for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP.entrySet()) {Channel channel = portMapOnTargetIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }}}// Step3-fallback BranchSession注册的ip没有对应Channel,从resourceId+applicationId找对应channelif (resultChannel == null) {for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap.entrySet()) {if (ipMapEntry.getKey().equals(targetIP)) { continue; }ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {continue;}for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {Channel channel = portMapOnOtherIPEntry.getValue().getChannel();if (channel.isActive()) {resultChannel = channel;break;} }if (resultChannel != null) { break; }}}}// Step2-fallback BranchSession注册的applicationId没有对应channel,从resourceId中找一个Channelif (resultChannel == null) {resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);}return resultChannel;}

分支事务提交请求

AbstractNettyRemotingServer

 ......return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());

RM处理分支事务提交

TCCResourceManager

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地commit方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method commitMethod = tccResource.getCommitMethod();try {// Step2 反序列化BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析commit方法入参列表Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行commit方法 也就相当于执行到了业务指定的commit方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {// Step4-1 开启useTCCFencetry {result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {// Step4-2 未开启useTCCFenceret = commitMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}//如果处理正常返回二阶段已提交 如果异常返回分支事务二阶段提交失败重试return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_CommitFailed_Retryable;}
}

二阶段回滚

TM发起二阶段回滚请求

这个和AT那里的差不多 当TCC里面的try业务代码异常会触发二阶段的回滚

TC处理二阶段回滚请求

DefaultCore

public GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {globalSession.close();if (globalSession.getStatus() == GlobalStatus.Begin) {// 将全局锁lock_table状态更新为Rollbacking // 将全局事务global_table状态更新为RollbackingglobalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}// 执行全局回滚boolean rollbackSuccess = doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}

DefaultCore

public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;//遍历分支事务Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// Step1 发送BranchRollbackRequestBranchStatus branchStatus = branchRollback(globalSession, branchSession);switch (branchStatus) {case PhaseTwo_Rollbacked:// Step2-1 释放全局锁,删除分支事务SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable: // 回滚失败且无法重试成功SessionHelper.endRollbackFailed(globalSession, retrying);return false;default:// Step2-2 如果RM回滚失败 全局事务状态变为RollbackRetrying 等待重试if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {if (!retrying) {// 如果Step1或Step2步骤异常 全局事务状态变为RollbackRetrying 等待重试globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// 如果存在一个分支事务回滚失败,则返回falseif (result != null) {return result;}// Step3// 对于file模式,直接删除全局事务// 对于db/redis模式,异步再次执行doGlobalRollback,这里不做任何处理//  防止由于各种网络波动造成分支事务注册成功lock_table和branch_table中始终有残留数据//  导致全局锁一直被占用,无法释放if (success) {SessionHelper.endRollbacked(globalSession, retrying);}return success;
}

SessionHelper

public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {// 如果是重试 或 file模式if (retryGlobal || !DELAY_HANDLE_SESSION) {long beginTime = System.currentTimeMillis();GlobalStatus currentStatus = globalSession.getStatus();boolean retryBranch =currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;if (isTimeoutGlobalStatus(currentStatus)) {globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);} else {globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);}// 删除全局事务global_tableglobalSession.end();}
}

RM处理分支事务回滚

TCCResourceManager

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// Step1 从本地缓存tccResourceMap中定位到资源对应本地rollback方法TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);Object targetTCCBean = tccResource.getTargetBean();Method rollbackMethod = tccResource.getRollbackMethod();try {// Step2 反序列化BusinessActionContext//BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);// Step3 解析rollback方法入参列表Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);Object ret;boolean result;// Step4 执行rollback方法if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {try {result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,args, tccResource.getActionName());} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {throw e.getCause();}} else {ret = rollbackMethod.invoke(targetTCCBean, args);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}} else {result = true;}}return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;} catch (Throwable t) {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}
}

相关文章:

Seata源码——TCC模式解析02

初始化 在SpringBoot启动的时候通过自动注入机制将GlobalTransactionScanner注入进ioc而GlobalTransactionScanner继承AbstractAutoProxyCreatorAbstract 在postProcessAfterInitialization阶段由子类创建代理TccActionInterceptor GlobalTransactionScanner protected Obje…...

缓存-Redis

Springboot使用Redis 引入pom依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency>在application.yml、application-dev.yml中配置Redis的访…...

PADS Layout安全间距检查报错

问题&#xff1a; 在Pads Layout完成layout后&#xff0c;进行工具-验证设计安全间距检查时&#xff0c;差分对BAK_FIXCLK_100M_P / BAK_FIXCLK_100M_N的安全间距检查报错&#xff0c;最小为3.94mil&#xff0c;但是应该大于等于5mil&#xff1b;如下两张图&#xff1a; 检查&…...

ebpf基础篇(二) ----- ebpf前世今生

bpf 要追述ebpf的历史,就不得不提bpf. bpf(Berkeley Packet Filter)从早(1992年)诞生于类Unix系统中,用于数据包分析. 它提供了数据链路层的接口,可以在数据链路层发送和接收数据.如果网卡支持混杂模式,所有的数据包都可以被接收,即使这些数据包的目的地址是其它主机. BPF最为…...

我的一天:追求专业成长与生活平衡

早晨的序幕&#xff1a;奋斗的开始 今天的一天始于清晨的6点47分。实现了昨天的早睡早起的蜕变计划。洗漱完成之后&#xff0c;7点17分出门&#xff0c;7点33分我抵达公司&#xff0c;为新的一天做好准备。7点52分&#xff0c;我开始我的学习之旅。正如我所体会的&#xff0c;“…...

【动态规划】斐波那契数列模型

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析&#xff08;3&#xff09; 前言 算法原理 1.状态表示 是什么&#xff1f;dp表(一维数组…...

机器人运动学分析与动力学分析主要作用

机器人运动学分析和动力学分析是两个重要的概念&#xff0c;它们在研究和设计工业机器人时起着关键作用。 1. 机器人运动学分析&#xff1a; 机器人运动学是研究机器人运动的科学&#xff0c;它涉及机器人的位置、速度、加速度和轨迹等方面。机器人运动学分析主要包括正解和逆…...

【Java 基础】33 JDBC

文章目录 1. 数据库连接1&#xff09;加载驱动2&#xff09;建立连接 2. 常见操作1&#xff09;创建表2&#xff09;插入数据3&#xff09;查询数据4&#xff09;使用 PreparedStatement5&#xff09;事务管理 3. 注意事项总结 Java Database Connectivity&#xff08;JDBC&…...

Unity中Shader缩放矩阵

文章目录 前言一、直接相乘缩放1、在属性面板定义一个四维变量&#xff0c;用xyz分别控制在xyz轴上的缩放2、在常量缓存区申明该变量3、在顶点着色器对其进行相乘&#xff0c;来缩放变换4、我们来看看效果 二、使用矩阵乘法代替直接相乘缩放的原理1、我们按如下格式得到缩放矩阵…...

Nessus详细安装-windows (保姆级教程)

Nessus描述 Nessus 是一款广泛使用的网络漏洞扫描工具。它由 Tenable Network Security 公司开发&#xff0c;旨在帮助组织评估其计算机系统和网络的安全性。 Nessus 可以执行自动化的漏洞扫描&#xff0c;通过扫描目标系统、识别和评估可能存在的安全漏洞和弱点。它可以检测…...

Stream流的简单使用

stream流的三类方法 获取Stream流 ○ 创建一条流水线,并把数据放到流水线上准备进行操作中间方法 ○ 流水线上的操作 ○ 一次操作完毕之后,还可以继续进行其他操作终结方法 ○ 一个Stream流只能有一个终结方法 ○ 是流水线上的最后一个操作 其实Stream流非常简单&#xff0c;只…...

智能优化算法应用:基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于蛇优化算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.蛇优化算法4.实验参数设定5.算法结果6.参考文…...

vue和react diff的详解和不同

diff算法 简述&#xff1a;第一次对比真实dom和虚拟树之间的同层差别&#xff0c;后面为对比新旧虚拟dom树之间的同层差别。 虚拟dom 简述&#xff1a;js对象形容模拟真实dom 具体&#xff1a; 1.虚拟dom是存在内存中的js对象&#xff0c;利用内存的高效率运算。虚拟dom属…...

智能优化算法应用:基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于鹈鹕算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鹈鹕算法4.实验参数设定5.算法结果6.参考文献7.MA…...

10:IIC通信

1&#xff1a;IIC通信 I2C总线&#xff08;Inter IC BUS&#xff09; 是由Philips公司开发的一种通用数据总线&#xff0c;应用广泛&#xff0c;下面是一些指标参数&#xff1a; 两根通信线&#xff1a;SCL&#xff08;Serial Clock&#xff0c;串行时钟线&#xff09;、SDA&a…...

互联网上门洗衣洗鞋小程序优势有哪些?

互联网洗鞋店小程序相较于传统洗鞋方式&#xff0c;具有以下优势&#xff1b; 1. 便捷性&#xff1a;用户只需通过手机即可随时随地下单并查询&#xff0c;省去了许多不必要的时间和精力。学生们无需走出宿舍或校园&#xff0c;就能轻松预约洗鞋并取件。 2. 精准定位&#xff1…...

Java中如何优雅地根治null值引起的Bug问题

1. Java对象为null会引发的问题 NullPointerException&#xff1a;当你尝试调用或访问一个null对象的属性或方法时&#xff0c;Java会抛出NullPointerException异常。例如&#xff0c;如果你有一个名为person的变量&#xff0c;它被设置为null&#xff0c;然后你尝试调用perso…...

C# WPF上位机开发(子窗口通知父窗口更新进度)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 这两天在编写代码的时候&#xff0c;正好遇到一个棘手的问题&#xff0c;解决之后感觉挺有意义的&#xff0c;所以先用blog记录一下&#xff0c;后…...

XUbuntu22.04之跨平台容器格式工具:MKVToolNix(二百零三)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…...

vue中的生命周期和VueComponent实例对象

生命周期 生命周期又叫生命周期钩子&#xff0c;生命周期函数 生命周期是&#xff0c;Vue在关键的时刻帮我们调用的一些特殊名字的函数 生命周期的this指向vm或者组件实例对象 mounted会将初始化的Dom挂载到页面上 <template><div class"hello"><…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案

随着新能源汽车的快速普及&#xff0c;充电桩作为核心配套设施&#xff0c;其安全性与可靠性备受关注。然而&#xff0c;在高温、高负荷运行环境下&#xff0c;充电桩的散热问题与消防安全隐患日益凸显&#xff0c;成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

回溯算法学习

一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...

Linux系统部署KES

1、安装准备 1.版本说明V008R006C009B0014 V008&#xff1a;是version产品的大版本。 R006&#xff1a;是release产品特性版本。 C009&#xff1a;是通用版 B0014&#xff1a;是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存&#xff1a;1GB 以上 硬盘&#xf…...

系统掌握PyTorch:图解张量、Autograd、DataLoader、nn.Module与实战模型

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文通过代码驱动的方式&#xff0c;系统讲解PyTorch核心概念和实战技巧&#xff0c;涵盖张量操作、自动微分、数据加载、模型构建和训练全流程&#…...

Ubuntu系统多网卡多相机IP设置方法

目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机&#xff0c;交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息&#xff0c;系统版本&#xff1a;Ubuntu22.04.5 LTS&#xff1b;内核版本…...

高防服务器价格高原因分析

高防服务器的价格较高&#xff0c;主要是由于其特殊的防御机制、硬件配置、运营维护等多方面的综合成本。以下从技术、资源和服务三个维度详细解析高防服务器昂贵的原因&#xff1a; 一、硬件与技术投入 大带宽需求 DDoS攻击通过占用大量带宽资源瘫痪目标服务器&#xff0c;因此…...

TJCTF 2025

还以为是天津的。这个比较容易&#xff0c;虽然绕了点弯&#xff0c;可还是把CP AK了&#xff0c;不过我会的别人也会&#xff0c;还是没啥名次。记录一下吧。 Crypto bacon-bits with open(flag.txt) as f: flag f.read().strip() with open(text.txt) as t: text t.read…...

新版NANO下载烧录过程

一、序言 搭建 Jetson 系列产品烧录系统的环境需要在电脑主机上安装 Ubuntu 系统。此处使用 18.04 LTS。 二、环境搭建 1、安装库 $ sudo apt-get install qemu-user-static$ sudo apt-get install python 搭建环境的过程需要这个应用库来将某些 NVIDIA 软件组件安装到 Je…...