城市直播房产教育博客汽车
快传网
汽车报价
买车新车
博客专栏
专题精品
教育留学
高考读书
房产家居
彩票视频
直播黑猫
投资微博
城市上海
政务旅游

源码解析SeataAT模式中分支事务的提交或回滚是如何被触发

6月18日 乱人心投稿
  前言
  在之前的博客中,已经介绍过了TM在seataAT模式中的处理流程、TC在seata分布式事务中的主要任务以及RM在seataAT模式中的sql语句执行流程,下面我们通过源码讲述分布式事务是如何实现提交或回滚的。分支事务的提交或回滚
  在seataAT模式中,只有当所有的分支事务全部成功提交后,才会触发分布式事务的提交:publicclassConnectionProxyextendsAbstractConnectionProxy{privatevoidprocessGlobalTransactionCommit()throwsSQLException{try{注册分支事务,添加行锁,其实就是分布式锁register();}catch(TransactionExceptione){recognizeLockKeyConflictException(e,context。buildLockKeys());}try{插入undologUndoLogManagerFactory。getUndoLogManager(this。getDbType())。flushUndoLogs(this);提交本地事务targetConnection。commit();}catch(Throwableex){LOGGER。error(processconnectionProxycommiterror:{},ex。getMessage(),ex);上报分支事务提交失败状态report(false);抛出异常,最终会被TM捕捉到,触发分布式事务的回滚thrownewSQLException(ex);}if(ISREPORTSUCCESSENABLE){上报分支事务成功提交状态report(true);}恢复现场context。reset();}}
  在上述源码分析中,当所有的RM分支事务提交成功后,TC会接收到所有RM分支事务的状态,代码最终会执行到TM的模版逻辑中。TM触发分布式事务的提交或回滚
  TM模版代码:publicObjectexecute(TransactionalExecutorbusiness)throwsThrowable{1。拿到整理好的GlobalTransactional注解里面的配置信息TransactionInfotxInfobusiness。getTransactionInfo();if(txInfonull){thrownewShouldNeverHappenException(transactionInfodoesnotexist);}1。1获取当前的分布式事务,如果为null的话,说明这是分布式事务的发起者;如果不为null,说明这是分布式事务的参与者GlobalTransactiontxGlobalTransactionContext。getCurrent();1。2获取分布式事务的传播级别,其实就是按照spring的传播级别来一套,区别就是spring事务是本地事务,这是分布式事务,原理都一样PropagationpropagationtxInfo。getPropagation();SuspendedResourcesHoldersuspendedResourcesHtry{这个switch里面全都是处理分布式事务传播级别的switch(propagation){如果不支持分布式事务,如果当前存在事务,那么先挂起当前的分布式事务,再执行业务逻辑caseNOTSUPPORTED:分布式事务存在,先挂起if(existingTransaction(tx)){suspendedResourcesHoldertx。suspend();}执行业务逻辑returnbusiness。execute();如果是每次都要创建一个新的分布式事务,先把当前存在的分布式事务挂起,然后创建一个新分布式事务caseREQUIRESNEW:如果分布式事务存在,先挂起当前分布式事务,再创建一个新的分布式事务if(existingTransaction(tx)){suspendedResourcesHoldertx。suspend();txGlobalTransactionContext。createNew();}之所以用break,是为了后面的代码和其他的传播级别一起共用,业务逻辑肯定还是要执行的如果支持分布式事务,如果当前不存在分布式事务,那么直接执行业务逻辑,否则以分布式事务的方式执行业务逻辑caseSUPPORTS:如果不存在分布式事务,直接执行业务逻辑if(notExistingTransaction(tx)){returnbusiness。execute();}否则,以分布式事务的方式执行业务逻辑如果有分布式事务,就在当前分布式事务下执行业务逻辑,否则创建一个新的分布式事务执行业务逻辑caseREQUIRED:Ifcurrenttransactionisexisting,executewithcurrenttransaction,elsecontinueandexecutewithnewtransaction。如果不允许有分布式事务,那么一旦发现存在分布式事务,直接抛异常;只有不存在分布式事务的时候才正常执行caseNEVER:存在分布式事务,抛异常if(existingTransaction(tx)){thrownewTransactionException(String。format(Existingtransactionfoundfortransactionmarkedwithpropagationnever,xids,tx。getXid()));}else{不存在分布式事务,执行业务逻辑returnbusiness。execute();}一定要有分布式事务,分布式事务不存在的话,抛异常;caseMANDATORY:不存在分布式事务,抛异常if(notExistingTransaction(tx)){thrownewTransactionException(Noexistingtransactionfoundfortransactionmarkedwithpropagationmandatory);}Continueandexecutewithcurrenttransaction。default:thrownewTransactionException(NotSupportedPropagation:propagation);}上面的传播级别的逻辑处理完毕,下面就是公共的处理逻辑1。3如果当前分布式事务没有的话,那么我们就要创建新的分布式事务,此时我们就是分布式事务的发起者,也就是TM本身,否则不能称之为TMif(txnull){txGlobalTransactionContext。createNew();}开始准备干活的条件把我们这个方法的全局锁配置放进当前线程中,并且把线程中已有的全局锁的配置取出来我们在干完自己的活后,会把这个取出来的配置放回去的GlobalLockConfigpreviousConfigreplaceGlobalLockConfig(txInfo);try{2。如果我们是分布式事务的发起者的话,那么我们会和TC通信,并且拿到一个XID;如果我们不是分布式事务的发起者的话,那么这一步啥也不干这个XID可以从RootContext中获取beginTransaction(txInfo,tx);Otry{执行业务逻辑rsbusiness。execute();}catch(Throwableex){3。发生任何异常,我们准备启动回滚机制completeTransactionAfterThrowing(txInfo,tx,ex);}4。一切顺利,通知提交分布式事务commitTransaction(tx);}finally{5。恢复现场,把之前的配置放回去resumeGlobalLockConfig(previousConfig);触发回调triggerAfterCompletion();清理工作cleanUp();}}finally{恢复之前挂起的事务if(suspendedResourcesHolder!null){tx。resume(suspendedResourcesHolder);}}}
  主要关注这一段代码:try{2。如果我们是分布式事务的发起者的话,那么我们会和TC通信,并且拿到一个XID;如果我们不是分布式事务的发起者的话,那么这一步啥也不干这个XID可以从RootContext中获取beginTransaction(txInfo,tx);Otry{执行业务逻辑rsbusiness。execute();}catch(Throwableex){3。发生任何异常,我们准备启动回滚机制completeTransactionAfterThrowing(txInfo,tx,ex);}4。一切顺利,通知提交分布式事务commitTransaction(tx);}finally{5。恢复现场,把之前的配置放回去resumeGlobalLockConfig(previousConfig);触发回调triggerAfterCompletion();清理工作cleanUp();}}finally{恢复之前挂起的事务if(suspendedResourcesHolder!null){tx。resume(suspendedResourcesHolder);}}
  当任意RM分支事务产生异常后,都会触发TM执行completeTransactionAfterThrowing(txInfo,tx,ex),只有当所有的RM分支事务提交成功后,TM才会发起commitTransaction(tx);异常回滚privatevoidcompleteTransactionAfterThrowing(TransactionInfotxInfo,GlobalTransactiontx,ThrowableoriginalException)throwsTransactionalExecutor。ExecutionException{如果异常类型和指定的类型一致,那么发起回滚;不一致还是要提交分布式事务if(txInfo!nulltxInfo。rollbackOn(originalException)){try{回滚分布式事务rollbackTransaction(tx,originalException);}catch(TransactionExceptiontxe){回滚失败抛异常thrownewTransactionalExecutor。ExecutionException(tx,txe,TransactionalExecutor。Code。RollbackFailure,originalException);}}else{不是指定的异常类型,还是继续提交分布式事务commitTransaction(tx);}}privatevoidrollbackTransaction(GlobalTransactiontx,ThrowableoriginalException)throwsTransactionException,TransactionalExecutor。ExecutionException{执行回调,默认空回调triggerBeforeRollback();回滚tx。rollback();执行回调,默认空回调triggerAfterRollback();就算回滚没问题,照样抛异常,目的应该是告知开发人员此处产生了回滚thrownewTransactionalExecutor。ExecutionException(tx,GlobalStatus。RollbackRetrying。equals(tx。getLocalStatus())?TransactionalExecutor。Code。RollbackRetrying:TransactionalExecutor。Code。RollbackDone,originalException);}Overridepublicvoidrollback()throwsTransactionException{如果是分布式事务参与者,那么啥也不做,RM的回滚不在这里,这是TM的回滚if(roleGlobalTransactionRole。Participant){Participanthasnoresponsibilityofrollbackif(LOGGER。isDebugEnabled()){LOGGER。debug(IgnoreRollback():justinvolvedinglobaltransaction〔{}〕,xid);}}assertXIDNotNull();下面就是一个循环重试发起分布式事务回滚intretryROLLBACKRETRYCOUNT0?DEFAULTTMROLLBACKRETRYCOUNT:ROLLBACKRETRYCOUNT;try{while(retry0){try{发起回滚的核心代码statustransactionManager。rollback(xid);回滚成功跳出循环}catch(Throwableex){LOGGER。error(Failedtoreportglobalrollback〔{}〕,RetryCountdown:{},reason:{},this。getXid(),retry,ex。getMessage());重试失败次数完成才会跳出循环if(retry0){thrownewTransactionException(Failedtoreportglobalrollback,ex);}}}}finally{如果回滚的分布式事务就是当前的分布式事务,那么从当前线程中解绑XIDif(xid。equals(RootContext。getXID())){suspend();}}if(LOGGER。isInfoEnabled()){LOGGER。info(〔{}〕rollbackstatus:{},xid,status);}}OverridepublicGlobalStatusrollback(Stringxid)throwsTransactionException{准备发起请求给TC,回滚指定的分布式事务GlobalRollbackRequestglobalRollbacknewGlobalRollbackRequest();globalRollback。setXid(xid);GlobalRollbackResponseresponse(GlobalRollbackResponse)syncCall(globalRollback);returnresponse。getGlobalStatus();}分布式事务回滚逻辑中有以下几个点:
  触发回滚需要产生的异常和注解中指定的异常一致才会发起回滚,否则还是继续提交;回滚是可以设置重试次数的,只有重试都失败了,才会导致回滚失败,否则只要有一次成功,那么回滚就成功;TM发起的回滚其实只是和TC发起一次分布式事务回滚的通信,并没有数据库的操作;分布式事务提交privatevoidcommitTransaction(GlobalTransactiontx)throwsTransactionalExecutor。ExecutionException{try{回调,默认空回调triggerBeforeCommit();分布式事务提交tx。commit();回调,默认空回调triggerAfterCommit();}catch(TransactionExceptiontxe){4。1提交出异常,提交失败thrownewTransactionalExecutor。ExecutionException(tx,txe,TransactionalExecutor。Code。CommitFailure);}}Overridepublicvoidcommit()throwsTransactionException{如果只是分布式事务参与者,那么啥也不干,TM只能有一个,哈哈if(roleGlobalTransactionRole。Participant){Participanthasnoresponsibilityofcommittingif(LOGGER。isDebugEnabled()){LOGGER。debug(IgnoreCommit():justinvolvedinglobaltransaction〔{}〕,xid);}}assertXIDNotNull();分布式事务提交也是有重试的intretryCOMMITRETRYCOUNT0?DEFAULTTMCOMMITRETRYCOUNT:COMMITRETRYCOUNT;try{while(retry0){try{发起分布式事务提交statustransactionManager。commit(xid);提交成功跳出循环}catch(Throwableex){LOGGER。error(Failedtoreportglobalcommit〔{}〕,RetryCountdown:{},reason:{},this。getXid(),retry,ex。getMessage());重试结束,依然失败就抛异常if(retry0){thrownewTransactionException(Failedtoreportglobalcommit,ex);}}}}finally{如果提交的分布式事务就是当前事务,那么需要清理当前线程中的XIDif(xid。equals(RootContext。getXID())){suspend();}}if(LOGGER。isInfoEnabled()){LOGGER。info(〔{}〕commitstatus:{},xid,status);}}OverridepublicGlobalStatuscommit(Stringxid)throwsTransactionException{发起分布式事务提交请求,这是与TC通信GlobalCommitRequestglobalCommitnewGlobalCommitRequest();globalCommit。setXid(xid);GlobalCommitResponseresponse(GlobalCommitResponse)syncCall(globalCommit);returnresponse。getGlobalStatus();}
  分布式事务回滚也是可以设置重试次数的;分布式事务提交其实也是TM与TC进行通信,告知TC这个XID对应的分布式事务可以提交了;TC发起分布式事务提交或回滚
  TC通过io。seata。server。coordinator。DefaultCore发起分布式事务提交:OverridepublicbooleandoGlobalCommit(GlobalSessionglobalSession,booleanretrying)throwsTransactionException{startcommittingeventMetricsPublisher。postSessionDoingEvent(globalSession,retrying);if(globalSession。isSaga()){successgetCore(BranchType。SAGA)。doGlobalCommit(globalSession,retrying);}else{直接从这里开始看,这里有一个forEach循环遍历分布式事务中的所有分支事务BooleanresultSessionHelper。forEach(globalSession。getSortedBranches(),branchSession{ifnotretrying,skipthecanBeCommittedAsyncbranchesif(!retryingbranchSession。canBeCommittedAsync()){returnCONTINUE;}BranchStatuscurrentStatusbranchSession。getStatus();if(currentStatusBranchStatus。PhaseOneFailed){SessionHelper。removeBranch(globalSession,branchSession,!retrying);returnCONTINUE;}前面都是一些状态的校验,下面开始发起分支事务的提交try{BranchStatusbranchStatusgetCore(branchSession。getBranchType())。branchCommit(globalSession,branchSession);下面代码可忽略if(isXaerNotaTimeout(globalSession,branchStatus)){LOGGER。info(CommitbranchXAERNOTAretrytimeout,xid{}branchId{},globalSession。getXid(),branchSession。getBranchId());branchStatusBranchStatus。PhaseTwoC}switch(branchStatus){如果该分支事务已经提交,说明提交成功了,那么移除该分支casePhaseTwoCommitted:SessionHelper。removeBranch(globalSession,branchSession,!retrying);returnCONTINUE;如果该分支事务已经提交失败,并且无法重试,那么打上标记casePhaseTwoCommitFailedUnretryable:notatbranchSessionHelper。endCommitFailed(globalSession,retrying);LOGGER。error(Committingglobaltransaction〔{}〕finallyfailed,causedbybranchtransaction〔{}〕commitfailed。,globalSession。getXid(),branchSession。getBranchId());default:进入重试队列if(!retrying){globalSession。queueToRetryCommit();}如果可以异步提交,那就直接跳过,定时任务会执行if(globalSession。canBeCommittedAsync()){LOGGER。error(Committingbranchtransaction〔{}〕,status:{}andwillretrylater,branchSession。getBranchId(),branchStatus);returnCONTINUE;}else{剩下的就是提交失败的LOGGER。error(Committingglobaltransaction〔{}〕failed,causedbybranchtransaction〔{}〕commitfailed,willretrylater。,globalSession。getXid(),branchSession。getBranchId());}}}catch(Exceptionex){StackTraceLogger。error(LOGGER,ex,Committingbranchtransactionexception:{},newString〔〕{branchSession。toString()});if(!retrying){出了异常,放入队列继续等待提交globalSession。queueToRetryCommit();抛出异常给TMthrownewTransactionException(ex);}}returnCONTINUE;});Returniftheresultisnotnullif(result!null){}Ifhasbranchandnotallremainingbranchescanbecommittedasynchronously,doprintlogandreturnfalseif(globalSession。hasBranch()!globalSession。canBeCommittedAsync()){LOGGER。info(CommittingglobaltransactionisNOTdone,xid{}。,globalSession。getXid());}if(!retrying){设置分布式事务状态为提交成功globalSession。setStatus(GlobalStatus。Committed);}}ifitsucceedsandthereisnobranch,retryingtrueistheasynchronousstatewhenretrying。EndCommittedisexecutedtoimproveconcurrencyperformance,andtheglobaltransactionends。。if(successglobalSession。getBranchSessions()。isEmpty()){SessionHelper。endCommitted(globalSession,retrying);LOGGER。info(Committingglobaltransactionissuccessfullydone,xid{}。,globalSession。getXid());}}
  也就是说,TC在收到TM的提交信号后,会RPC逐个调用RM执行分支事务的提交:publicabstractclassAbstractRMHandlerextendsAbstractExceptionHandlerimplementsRMInboundHandler,TransactionMessageHandler{RM收到分支事务提交请求OverridepublicBranchCommitResponsehandle(BranchCommitRequestrequest){BranchCommitResponseresponsenewBranchCommitResponse();模版模式exceptionHandleTemplate(newAbstractCallbackBranchCommitRequest,BranchCommitResponse(){Overridepublicvoidexecute(BranchCommitRequestrequest,BranchCommitResponseresponse)throwsTransactionException{执行分支事务提交doBranchCommit(request,response);}},request,response);}protectedvoiddoBranchCommit(BranchCommitRequestrequest,BranchCommitResponseresponse)throwsTransactionException{Stringxidrequest。getXid();longbranchIdrequest。getBranchId();StringresourceIdrequest。getResourceId();StringapplicationDatarequest。getApplicationData();if(LOGGER。isInfoEnabled()){LOGGER。info(Branchcommitting:xidbranchIdresourceIdapplicationData);}执行分支事务提交BranchStatusstatusgetResourceManager()。branchCommit(request。getBranchType(),xid,branchId,resourceId,applicationData);返回响应结果response。setXid(xid);response。setBranchId(branchId);response。setBranchStatus(status);if(LOGGER。isInfoEnabled()){LOGGER。info(Branchcommitresult:status);}}}
  根据上述代码,分布式事务的回滚其实也是TC接到TM的回滚信号后,通过RPC依次给RM发请求,触发各分支事务的回滚。
  其次就是TC里面的异步提交或回滚了,在博客TC在seata分布式事务中的主要任务中,TC服务里面有多个定时任务,该定时任务就是通过定时检查各分布式事务的状态来判断是否执行提交或回滚的。所以分布式事务的提交与回滚还分为异步方式和同步方式。小结
  根据上述源码分析,我们可以知道,分支事务的提交或回滚,是直接由TC服务触发,TM间接触发的;步骤可简单分解为以下几步:1。RM执行分支事务;
  1。1:分支事务执行成功;1。2:分支事务执行失败;
  2。TM管理分布式事务的提交或回滚;
  2。1:当任意RM分支事务执行出现异常,TM通过RPC向TC服务发起分布式事务的回滚;2。2:只有所有RM都成功提交分支事务,TM通过RPC向TC服务发起分布式事务的提交;
  3。TC服务依次向所有RM发起提交或回滚;
  3。1:TC接收到TM的提交请求,查询出所有的分支RM,并依次向所有的RM发起提交请求;3。2:TC接收到TM的回滚请求,查询所有的分支RM,依次向所有RM发起回滚请求;
  4。RM执行分支事务的提交或回滚;
  4。1:RM收到TC的提交请求,提交分支事务;4。2:RM收到TC的回滚请求,回滚分支事务;
投诉 评论 转载

朱婷和张常宁休战,此中国女排二队与上届的相较前景如何呢?上届女排世界联赛,郎导率领由张常宁领衔的中国女排二队出战,结果前面三周的比赛输球太多,当主力增援之后,最后一个阶段的比赛全胜,最终也仅名列第五名,没能杀入前四令到球迷非常不满。……源码解析SeataAT模式中分支事务的提交或回滚是如何被触发前言在之前的博客中,已经介绍过了TM在seataAT模式中的处理流程、TC在seata分布式事务中的主要任务以及RM在seataAT模式中的sql语句执行流程,下面我们通……什么是真空衰变?真空衰变真的能以光速毁灭整个宇宙吗?真空衰变(Vacuumdecay)是物理学家根据量子场论推测出的一种假说,有一种观点将其解读为:如果宇宙中的某一区域出现了真空衰变,那么真空衰变就会以光速从该区域向四面八方扩散……央视中秋晚会毛不易质朴李宇春惊艳但最震撼的还是太空三人组今年中秋怎么过?吃月饼,赏明月,看央视秋晚。一样都不能少。一年一度中秋夜,万家灯火时,人月共团圆。在这阖家欢乐的日子,央视秋晚又给我们带来了一道视觉盛宴……千元手机这4款才是王者,配置均衡性能很强,值得闭眼入手可不要小瞧一千多的手机,现在的千元手机是越做越好了,不仅会用上旗舰处理器,各类配置也能做到比较均衡,如果不出意外的话,买来用个三年还是没问题的。甚至平时还能运行大型游戏,……勇士官宣最佳球员!库里328仍落选,官推赞1。48亿先生无处落后了47分钟,勇士还是在最后一分钟完成逆转!北京时间5月10日,勇士101比98击败灰熊,系列赛3比1灰熊,晋级只剩一步之遥。勇士官方评选了此战最佳球员,不是砍下32分8助攻……一夜蒸发960多亿,芯片巨头们的寒冬如何熬过去硅基世界(图片来源:TheVerge)钛媒体App推出产业报道专题硅基世界,长期关注全球半导体领域的技术与产业升级,洞悉产业一手资讯、深度趋势。10月8日,一则美国芯片巨头……抖音业绩低于预期,将全力押注跨境电商字节跳动业绩低于预期这几年互联网江湖最大的搅局者字节跳动,发展也陷入了瓶颈。日前,抖音的运营主体字节跳动CEO梁汝波在公司全员会议上表示,公司2022年业绩低于预期……秘鲁免签一篇解决你所有关于签证的问题〔这是我的签证教程的第十篇:秘鲁篇。下一篇澳大利亚〕,喜欢的请关注哦2016年9月是秘鲁旅行的里程碑时刻有条件免签政策正式落地这意味着神秘秘鲁不再遥远去……赵丽颖登时装十月刊封面,羊毛卷裸色系主打居家女友风,太美赵丽颖不愧是85后小花的扛把子!接连登上重要时尚女刊金九银十封面。此前,以两套高定、四个封面占据《费加罗》金九刊,现在又收获《时装》十月周年刊封面,风头强劲。相比其……钟楚曦扎蟋蟀头出门,贴头皮发型,没几个人能驾驭轻易别尝试哈喽,大家好,我是biu时尚,很高兴又和大家分享明星时尚与搭配技巧!希望我的文章让你对时尚更加的感兴趣,让本身就好看的你,更加的有魅力!很多女孩子在选择时尚搭配的时候,可……气质不够硬就别再尬演军人了这8位明星演军人,看着让人真难受文阅栀编辑阅栀电视剧《士兵突击》之所以能够火遍全国,获得9。4的高评分,成为军旅题材影视剧中的佼佼者,很大一部分原因在于演员。张译、段奕宏、张国强和王宝强等演……
凤凰古城静态管理致游客滞留游客滞留隔离期间食宿免费重磅消息!国家教育平台上线338所知名高校课程免费开放豫健中医药膳坊气血两虚型大肠癌的饮食疗法41岁老将真拼了!破世界杯奖牌赛纪录,拿下铜牌完美谢幕李梦王思雨奖金超百万!女篮回归,平分奖金曝光,武桐桐含泪致歉说出来你别不信!只有这5人能让全联盟摆烂44大惊喜,美国职业大联盟建材行业年度策略需求复苏产能有序扩张,戴维斯双击值得期待每一间酒店里的情绪,其实都是一座城市的喃喃低语吴镇宇14岁儿子确诊,曾因录爸爸留下永久性伤疤,网友可怜有梅罗那味了!姆哈争霸已全面拉开,足坛新时代全面开启十个冷知识地球有寿命吗?会有灭亡的那一天吗?看完算长知识了
40场26球淘汰皇马,曼城拜仁争抢塔迪奇老年人如何保存收藏钱币让学习更轻松的学习方法,值得教给每个孩子今年冬天会冷吗2020关于活力的发言稿有关寿险公司客户服务理论与实践论文夜读丨人到中年,请远离这4件事巴西队功勋指挥官成国安新援热门,曾在世界杯踢主力,值得期待它是洗肠草,冬天每周吃2次,清肠刮油,随手一炒,真鲜美抄书打卡赚收益第五天人生永远没有太远的开始把创新代入音乐课堂真的来了但受益者不是你想的那些行业

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找江西南阳嘉兴昆明铜陵滨州广东西昌常德梅州兰州阳江运城金华广西萍乡大理重庆诸暨泉州安庆南充武汉辽宁