前言 在之前的博客中,已经介绍过了TM在seataAT模式中的处理流程、TC在seata分布式事务中的主要任务以及RM在seataAT模式中的sql语句执行流程,下面我们通过源码讲述分布式事务是如何实现提交或回滚的。分支事务的提交或回滚 在seataAT模式中,只有当所有的分支事务全部成功提交后,才会触发分布式事务的提交:publicclassConnectionProxyextendsAbstractConnectionProxy{privatevoidprocessGlobalTransactionCommit()throwsSQLException{try{注册分支事务,添加行锁,其实就是分布式锁register();}catch(TransactionExceptione){recognizeLockKeyConflictException(e,context。buildLockKeys());}try{插入undologUndoLogManagerFactory。getUndoLogManager(this。getDbType())。flushUndoLogs(this);提交本地事务targetConnection。commit();}catch(Throwableex){LOGGER。error(processconnectionProxycommiterror:{},ex。getMessage(),ex);上报分支事务提交失败状态report(false);抛出异常,最终会被TM捕捉到,触发分布式事务的回滚thrownewSQLException(ex);}if(ISREPORTSUCCESSENABLE){上报分支事务成功提交状态report(true);}恢复现场context。reset();}} 在上述源码分析中,当所有的RM分支事务提交成功后,TC会接收到所有RM分支事务的状态,代码最终会执行到TM的模版逻辑中。TM触发分布式事务的提交或回滚 TM模版代码:publicObjectexecute(TransactionalExecutorbusiness)throwsThrowable{1。拿到整理好的GlobalTransactional注解里面的配置信息TransactionInfotxInfobusiness。getTransactionInfo();if(txInfonull){thrownewShouldNeverHappenException(transactionInfodoesnotexist);}1。1获取当前的分布式事务,如果为null的话,说明这是分布式事务的发起者;如果不为null,说明这是分布式事务的参与者GlobalTransactiontxGlobalTransactionContext。getCurrent();1。2获取分布式事务的传播级别,其实就是按照spring的传播级别来一套,区别就是spring事务是本地事务,这是分布式事务,原理都一样PropagationpropagationtxInfo。getPropagation();SuspendedResourcesHoldersuspendedResourcesHtry{这个switch里面全都是处理分布式事务传播级别的switch(propagation){如果不支持分布式事务,如果当前存在事务,那么先挂起当前的分布式事务,再执行业务逻辑caseNOTSUPPORTED:分布式事务存在,先挂起if(existingTransaction(tx)){suspendedResourcesHoldertx。suspend();}执行业务逻辑returnbusiness。execute();如果是每次都要创建一个新的分布式事务,先把当前存在的分布式事务挂起,然后创建一个新分布式事务caseREQUIRESNEW:如果分布式事务存在,先挂起当前分布式事务,再创建一个新的分布式事务if(existingTransaction(tx)){suspendedResourcesHoldertx。suspend();txGlobalTransactionContext。createNew();}之所以用break,是为了后面的代码和其他的传播级别一起共用,业务逻辑肯定还是要执行的如果支持分布式事务,如果当前不存在分布式事务,那么直接执行业务逻辑,否则以分布式事务的方式执行业务逻辑caseSUPPORTS:如果不存在分布式事务,直接执行业务逻辑if(notExistingTransaction(tx)){returnbusiness。execute();}否则,以分布式事务的方式执行业务逻辑如果有分布式事务,就在当前分布式事务下执行业务逻辑,否则创建一个新的分布式事务执行业务逻辑caseREQUIRED:Ifcurrenttransactionisexisting,executewithcurrenttransaction,elsecontinueandexecutewithnewtransaction。如果不允许有分布式事务,那么一旦发现存在分布式事务,直接抛异常;只有不存在分布式事务的时候才正常执行caseNEVER:存在分布式事务,抛异常if(existingTransaction(tx)){thrownewTransactionException(String。format(Existingtransactionfoundfortransactionmarkedwithpropagationnever,xids,tx。getXid()));}else{不存在分布式事务,执行业务逻辑returnbusiness。execute();}一定要有分布式事务,分布式事务不存在的话,抛异常;caseMANDATORY:不存在分布式事务,抛异常if(notExistingTransaction(tx)){thrownewTransactionException(Noexistingtransactionfoundfortransactionmarkedwithpropagationmandatory);}Continueandexecutewithcurrenttransaction。default:thrownewTransactionException(NotSupportedPropagation:propagation);}上面的传播级别的逻辑处理完毕,下面就是公共的处理逻辑1。3如果当前分布式事务没有的话,那么我们就要创建新的分布式事务,此时我们就是分布式事务的发起者,也就是TM本身,否则不能称之为TMif(txnull){txGlobalTransactionContext。createNew();}开始准备干活的条件把我们这个方法的全局锁配置放进当前线程中,并且把线程中已有的全局锁的配置取出来我们在干完自己的活后,会把这个取出来的配置放回去的GlobalLockConfigpreviousConfigreplaceGlobalLockConfig(txInfo);try{2。如果我们是分布式事务的发起者的话,那么我们会和TC通信,并且拿到一个XID;如果我们不是分布式事务的发起者的话,那么这一步啥也不干这个XID可以从RootContext中获取beginTransaction(txInfo,tx);Otry{执行业务逻辑rsbusiness。execute();}catch(Throwableex){3。发生任何异常,我们准备启动回滚机制completeTransactionAfterThrowing(txInfo,tx,ex);}4。一切顺利,通知提交分布式事务commitTransaction(tx);}finally{5。恢复现场,把之前的配置放回去resumeGlobalLockConfig(previousConfig);触发回调triggerAfterCompletion();清理工作cleanUp();}}finally{恢复之前挂起的事务if(suspendedResourcesHolder!null){tx。resume(suspendedResourcesHolder);}}} 主要关注这一段代码:try{2。如果我们是分布式事务的发起者的话,那么我们会和TC通信,并且拿到一个XID;如果我们不是分布式事务的发起者的话,那么这一步啥也不干这个XID可以从RootContext中获取beginTransaction(txInfo,tx);Otry{执行业务逻辑rsbusiness。execute();}catch(Throwableex){3。发生任何异常,我们准备启动回滚机制completeTransactionAfterThrowing(txInfo,tx,ex);}4。一切顺利,通知提交分布式事务commitTransaction(tx);}finally{5。恢复现场,把之前的配置放回去resumeGlobalLockConfig(previousConfig);触发回调triggerAfterCompletion();清理工作cleanUp();}}finally{恢复之前挂起的事务if(suspendedResourcesHolder!null){tx。resume(suspendedResourcesHolder);}} 当任意RM分支事务产生异常后,都会触发TM执行completeTransactionAfterThrowing(txInfo,tx,ex),只有当所有的RM分支事务提交成功后,TM才会发起commitTransaction(tx);异常回滚privatevoidcompleteTransactionAfterThrowing(TransactionInfotxInfo,GlobalTransactiontx,ThrowableoriginalException)throwsTransactionalExecutor。ExecutionException{如果异常类型和指定的类型一致,那么发起回滚;不一致还是要提交分布式事务if(txInfo!nulltxInfo。rollbackOn(originalException)){try{回滚分布式事务rollbackTransaction(tx,originalException);}catch(TransactionExceptiontxe){回滚失败抛异常thrownewTransactionalExecutor。ExecutionException(tx,txe,TransactionalExecutor。Code。RollbackFailure,originalException);}}else{不是指定的异常类型,还是继续提交分布式事务commitTransaction(tx);}}privatevoidrollbackTransaction(GlobalTransactiontx,ThrowableoriginalException)throwsTransactionException,TransactionalExecutor。ExecutionException{执行回调,默认空回调triggerBeforeRollback();回滚tx。rollback();执行回调,默认空回调triggerAfterRollback();就算回滚没问题,照样抛异常,目的应该是告知开发人员此处产生了回滚thrownewTransactionalExecutor。ExecutionException(tx,GlobalStatus。RollbackRetrying。equals(tx。getLocalStatus())?TransactionalExecutor。Code。RollbackRetrying:TransactionalExecutor。Code。RollbackDone,originalException);}Overridepublicvoidrollback()throwsTransactionException{如果是分布式事务参与者,那么啥也不做,RM的回滚不在这里,这是TM的回滚if(roleGlobalTransactionRole。Participant){Participanthasnoresponsibilityofrollbackif(LOGGER。isDebugEnabled()){LOGGER。debug(IgnoreRollback():justinvolvedinglobaltransaction〔{}〕,xid);}}assertXIDNotNull();下面就是一个循环重试发起分布式事务回滚intretryROLLBACKRETRYCOUNT0?DEFAULTTMROLLBACKRETRYCOUNT:ROLLBACKRETRYCOUNT;try{while(retry0){try{发起回滚的核心代码statustransactionManager。rollback(xid);回滚成功跳出循环}catch(Throwableex){LOGGER。error(Failedtoreportglobalrollback〔{}〕,RetryCountdown:{},reason:{},this。getXid(),retry,ex。getMessage());重试失败次数完成才会跳出循环if(retry0){thrownewTransactionException(Failedtoreportglobalrollback,ex);}}}}finally{如果回滚的分布式事务就是当前的分布式事务,那么从当前线程中解绑XIDif(xid。equals(RootContext。getXID())){suspend();}}if(LOGGER。isInfoEnabled()){LOGGER。info(〔{}〕rollbackstatus:{},xid,status);}}OverridepublicGlobalStatusrollback(Stringxid)throwsTransactionException{准备发起请求给TC,回滚指定的分布式事务GlobalRollbackRequestglobalRollbacknewGlobalRollbackRequest();globalRollback。setXid(xid);GlobalRollbackResponseresponse(GlobalRollbackResponse)syncCall(globalRollback);returnresponse。getGlobalStatus();}分布式事务回滚逻辑中有以下几个点: 触发回滚需要产生的异常和注解中指定的异常一致才会发起回滚,否则还是继续提交;回滚是可以设置重试次数的,只有重试都失败了,才会导致回滚失败,否则只要有一次成功,那么回滚就成功;TM发起的回滚其实只是和TC发起一次分布式事务回滚的通信,并没有数据库的操作;分布式事务提交privatevoidcommitTransaction(GlobalTransactiontx)throwsTransactionalExecutor。ExecutionException{try{回调,默认空回调triggerBeforeCommit();分布式事务提交tx。commit();回调,默认空回调triggerAfterCommit();}catch(TransactionExceptiontxe){4。1提交出异常,提交失败thrownewTransactionalExecutor。ExecutionException(tx,txe,TransactionalExecutor。Code。CommitFailure);}}Overridepublicvoidcommit()throwsTransactionException{如果只是分布式事务参与者,那么啥也不干,TM只能有一个,哈哈if(roleGlobalTransactionRole。Participant){Participanthasnoresponsibilityofcommittingif(LOGGER。isDebugEnabled()){LOGGER。debug(IgnoreCommit():justinvolvedinglobaltransaction〔{}〕,xid);}}assertXIDNotNull();分布式事务提交也是有重试的intretryCOMMITRETRYCOUNT0?DEFAULTTMCOMMITRETRYCOUNT:COMMITRETRYCOUNT;try{while(retry0){try{发起分布式事务提交statustransactionManager。commit(xid);提交成功跳出循环}catch(Throwableex){LOGGER。error(Failedtoreportglobalcommit〔{}〕,RetryCountdown:{},reason:{},this。getXid(),retry,ex。getMessage());重试结束,依然失败就抛异常if(retry0){thrownewTransactionException(Failedtoreportglobalcommit,ex);}}}}finally{如果提交的分布式事务就是当前事务,那么需要清理当前线程中的XIDif(xid。equals(RootContext。getXID())){suspend();}}if(LOGGER。isInfoEnabled()){LOGGER。info(〔{}〕commitstatus:{},xid,status);}}OverridepublicGlobalStatuscommit(Stringxid)throwsTransactionException{发起分布式事务提交请求,这是与TC通信GlobalCommitRequestglobalCommitnewGlobalCommitRequest();globalCommit。setXid(xid);GlobalCommitResponseresponse(GlobalCommitResponse)syncCall(globalCommit);returnresponse。getGlobalStatus();} 分布式事务回滚也是可以设置重试次数的;分布式事务提交其实也是TM与TC进行通信,告知TC这个XID对应的分布式事务可以提交了;TC发起分布式事务提交或回滚 TC通过io。seata。server。coordinator。DefaultCore发起分布式事务提交:OverridepublicbooleandoGlobalCommit(GlobalSessionglobalSession,booleanretrying)throwsTransactionException{startcommittingeventMetricsPublisher。postSessionDoingEvent(globalSession,retrying);if(globalSession。isSaga()){successgetCore(BranchType。SAGA)。doGlobalCommit(globalSession,retrying);}else{直接从这里开始看,这里有一个forEach循环遍历分布式事务中的所有分支事务BooleanresultSessionHelper。forEach(globalSession。getSortedBranches(),branchSession{ifnotretrying,skipthecanBeCommittedAsyncbranchesif(!retryingbranchSession。canBeCommittedAsync()){returnCONTINUE;}BranchStatuscurrentStatusbranchSession。getStatus();if(currentStatusBranchStatus。PhaseOneFailed){SessionHelper。removeBranch(globalSession,branchSession,!retrying);returnCONTINUE;}前面都是一些状态的校验,下面开始发起分支事务的提交try{BranchStatusbranchStatusgetCore(branchSession。getBranchType())。branchCommit(globalSession,branchSession);下面代码可忽略if(isXaerNotaTimeout(globalSession,branchStatus)){LOGGER。info(CommitbranchXAERNOTAretrytimeout,xid{}branchId{},globalSession。getXid(),branchSession。getBranchId());branchStatusBranchStatus。PhaseTwoC}switch(branchStatus){如果该分支事务已经提交,说明提交成功了,那么移除该分支casePhaseTwoCommitted:SessionHelper。removeBranch(globalSession,branchSession,!retrying);returnCONTINUE;如果该分支事务已经提交失败,并且无法重试,那么打上标记casePhaseTwoCommitFailedUnretryable:notatbranchSessionHelper。endCommitFailed(globalSession,retrying);LOGGER。error(Committingglobaltransaction〔{}〕finallyfailed,causedbybranchtransaction〔{}〕commitfailed。,globalSession。getXid(),branchSession。getBranchId());default:进入重试队列if(!retrying){globalSession。queueToRetryCommit();}如果可以异步提交,那就直接跳过,定时任务会执行if(globalSession。canBeCommittedAsync()){LOGGER。error(Committingbranchtransaction〔{}〕,status:{}andwillretrylater,branchSession。getBranchId(),branchStatus);returnCONTINUE;}else{剩下的就是提交失败的LOGGER。error(Committingglobaltransaction〔{}〕failed,causedbybranchtransaction〔{}〕commitfailed,willretrylater。,globalSession。getXid(),branchSession。getBranchId());}}}catch(Exceptionex){StackTraceLogger。error(LOGGER,ex,Committingbranchtransactionexception:{},newString〔〕{branchSession。toString()});if(!retrying){出了异常,放入队列继续等待提交globalSession。queueToRetryCommit();抛出异常给TMthrownewTransactionException(ex);}}returnCONTINUE;});Returniftheresultisnotnullif(result!null){}Ifhasbranchandnotallremainingbranchescanbecommittedasynchronously,doprintlogandreturnfalseif(globalSession。hasBranch()!globalSession。canBeCommittedAsync()){LOGGER。info(CommittingglobaltransactionisNOTdone,xid{}。,globalSession。getXid());}if(!retrying){设置分布式事务状态为提交成功globalSession。setStatus(GlobalStatus。Committed);}}ifitsucceedsandthereisnobranch,retryingtrueistheasynchronousstatewhenretrying。EndCommittedisexecutedtoimproveconcurrencyperformance,andtheglobaltransactionends。。if(successglobalSession。getBranchSessions()。isEmpty()){SessionHelper。endCommitted(globalSession,retrying);LOGGER。info(Committingglobaltransactionissuccessfullydone,xid{}。,globalSession。getXid());}} 也就是说,TC在收到TM的提交信号后,会RPC逐个调用RM执行分支事务的提交:publicabstractclassAbstractRMHandlerextendsAbstractExceptionHandlerimplementsRMInboundHandler,TransactionMessageHandler{RM收到分支事务提交请求OverridepublicBranchCommitResponsehandle(BranchCommitRequestrequest){BranchCommitResponseresponsenewBranchCommitResponse();模版模式exceptionHandleTemplate(newAbstractCallbackBranchCommitRequest,BranchCommitResponse(){Overridepublicvoidexecute(BranchCommitRequestrequest,BranchCommitResponseresponse)throwsTransactionException{执行分支事务提交doBranchCommit(request,response);}},request,response);}protectedvoiddoBranchCommit(BranchCommitRequestrequest,BranchCommitResponseresponse)throwsTransactionException{Stringxidrequest。getXid();longbranchIdrequest。getBranchId();StringresourceIdrequest。getResourceId();StringapplicationDatarequest。getApplicationData();if(LOGGER。isInfoEnabled()){LOGGER。info(Branchcommitting:xidbranchIdresourceIdapplicationData);}执行分支事务提交BranchStatusstatusgetResourceManager()。branchCommit(request。getBranchType(),xid,branchId,resourceId,applicationData);返回响应结果response。setXid(xid);response。setBranchId(branchId);response。setBranchStatus(status);if(LOGGER。isInfoEnabled()){LOGGER。info(Branchcommitresult:status);}}} 根据上述代码,分布式事务的回滚其实也是TC接到TM的回滚信号后,通过RPC依次给RM发请求,触发各分支事务的回滚。 其次就是TC里面的异步提交或回滚了,在博客TC在seata分布式事务中的主要任务中,TC服务里面有多个定时任务,该定时任务就是通过定时检查各分布式事务的状态来判断是否执行提交或回滚的。所以分布式事务的提交与回滚还分为异步方式和同步方式。小结 根据上述源码分析,我们可以知道,分支事务的提交或回滚,是直接由TC服务触发,TM间接触发的;步骤可简单分解为以下几步:1。RM执行分支事务; 1。1:分支事务执行成功;1。2:分支事务执行失败; 2。TM管理分布式事务的提交或回滚; 2。1:当任意RM分支事务执行出现异常,TM通过RPC向TC服务发起分布式事务的回滚;2。2:只有所有RM都成功提交分支事务,TM通过RPC向TC服务发起分布式事务的提交; 3。TC服务依次向所有RM发起提交或回滚; 3。1:TC接收到TM的提交请求,查询出所有的分支RM,并依次向所有的RM发起提交请求;3。2:TC接收到TM的回滚请求,查询所有的分支RM,依次向所有RM发起回滚请求; 4。RM执行分支事务的提交或回滚; 4。1:RM收到TC的提交请求,提交分支事务;4。2:RM收到TC的回滚请求,回滚分支事务;