from:cnblogs。comshanmlp16584514。htm事务的使用RocketMQ事务的使用场景单体架构下的事务 在单体系统的开发过程中,假如某个场景下需要对数据库的多张表进行操作,为了保证数据的一致性,一般会使用事务,将所有的操作全部提交或者在出错的时候全部回滚。以创建订单为例,假设下单后需要做两个操作:在订单表生成订单在积分表增加本次订单增加的积分记录 在单体架构下只需使用Transactional开启事务,就可以保证数据的一致性:Transactionalpublicvoidorder(){StringorderIdUUID。randomUUID()。toString();生成订单orderService。createOrder(orderId);增加积分creditService。addCredits(orderId);} 然而现在越来越多系统开始使用分布式架构,在分布式架构下,订单系统和积分系统可能是两个独立的服务,此时就不能使用上述的方法开启事务了,因为它们不处于同一个事务中,在出错的情况下,无法进行全部回滚,只能对当前服务的事务进行回滚,所以就有可能出现订单生成成功但是积分服务增加积分失败的情况(也可能相反),此时数据处于不一致的状态。分布式架构下的事务 分布式架构下如果需要保证事务的一致性,需要使用分布式事务,分布式事务的实现方式有多种,这里我们先看通过RocketMQ事务的实现方式。 同样以下单流程为例,在分布式架构下的处理流程如下:订单服务生成订单发送订单生成的MQ消息,积分服务订阅消息,有新的订单生成之后消费消息,增加对应的积分记录 普通MQ消息存在的问题 如果使用Transactional发送普通MQ的方式,看下存在的问题:假如订单创建成功,MQ消息发送成功,但是order方法在返回的前一刻,服务突然宕机,由于开启了事务,事务还未提交(方法结束后才会正常提交),所以订单表并未生成记录,但是MQ却已经发送成功并且被积分服务消费,此时就会存在订单未创建但是积分记录增加的情况假如先发送MQ消息再创建订单呢,此时问题就更明显了,如果MQ消息发送成功,创建订单失败,那么同样处于不一致的状态Transactionalpublicvoidorder(){StringorderIdUUID。randomUUID()。toString();创建订单OrderorderorderService。createOrder(orderDTO。getOrderId());发送订单创建的MQ消息sendOrderMessge(order);} 解决上述问题的方式就是使用RocketMQ事务消息。 RocketMQ事务消息的使用 使用事务消息需要实现自定义的事务监听器,TransactionListener提供了本地事务执行和状态回查的接口,executeLocalTransaction方法用于执行我们的本地事务,checkLocalTransaction是一种补偿机制,在异常情况下如果未收到事务的提交请求,会调用此方法进行事务状态查询,以此决定是否将事务进行提交回滚:publicinterfaceTransactionListener{执行本地事务parammsgHalf(prepare)messagehalf消息paramargCustombusinessparameterreturnTransactionstateLocalTransactionStateexecuteLocalTransaction(finalMessagemsg,finalObjectarg);本地事务状态回查parammsgCheckmessagereturnTransactionstateLocalTransactionStatecheckLocalTransaction(finalMessageExtmsg);} 这里我们实现自定义的事务监听器OrderTransactionListenerImpl:executeLocalTransaction方法中创建订单,如果创建成功返回COMMITMESSAGE,如果出现异常返回ROLLBACKMESSAGE。checkLocalTransaction方法中回查事务状态,根据消息体中的订单ID查询订单是否已经创建,如果创建成功提交事务,如果未获取到认为失败,此时回滚事务。publicclassOrderTransactionListenerImplimplementsTransactionListener{AutowiredprivateOrderServiceorderSOverridepublicLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){try{StringbodynewString(msg。getBody(),Charset。forName(UTF8));OrderDTOorderDTOJSON。parseObject(body,OrderDTO。class);模拟生成订单orderService。createOrder(orderDTO。getOrderId());}catch(Exceptione){出现异常,返回回滚状态returnLocalTransactionState。ROLLBACKMESSAGE;}创建成功,返回提交状态returnLocalTransactionState。COMMITMESSAGE;}OverridepublicLocalTransactionStatecheckLocalTransaction(MessageExtmsg){StringbodynewString(msg。getBody(),Charset。forName(UTF8));OrderDTOorderDTOJSON。parseObject(body,OrderDTO。class);try{根据订单ID查询订单是否存在OrderorderorderService。getOrderByOrderId(orderDTO。getOrderId());if(null!order){returnLocalTransactionState。COMMITMESSAGE;}}catch(Exceptione){returnLocalTransactionState。ROLLBACKMESSAGE;}returnLocalTransactionState。ROLLBACKMESSAGE;}} 接下来看如何发送事务消息,事务消息对应的生产者为TransactionMQProducer,创建TransactionMQProducer之后,设置上一步自定义的事务监听器OrderTransactionListenerImpl,然后将订单ID放入消息体中,调用sendMessageInTransaction发送事务消息:publicclassTransactionProducer{publicstaticvoidmain(String〔〕args)throwsMQClientException,InterruptedException{创建下单事务监听器TransactionListenertransactionListenernewOrderTransactionListenerImpl();创建生产者TransactionMQProducerproducernewTransactionMQProducer(ordergroup);事务状态回查线程池ExecutorServiceexecutorServicenewThreadPoolExecutor(2,5,100,TimeUnit。SECONDS,newArrayBlockingQueueRunnable(2000),newThreadFactory(){OverridepublicThreadnewThread(Runnabler){ThreadthreadnewThread(r);thread。setName(clienttransactionmsgcheckthread);}});设置线程池producer。setExecutorService(executorService);设置事务监听器producer。setTransactionListener(transactionListener);启动生产者producer。start();try{创建订单消息OrderDTOorderDTOnewOrderDTO();模拟生成订单唯一标识orderDTO。setOrderId(UUID。randomUUID()。toString());转为字节数组byte〔〕msgBodyJSON。toJSONString(orderDTO)。getBytes(RemotingHelper。DEFAULTCHARSET);构建消息MessagemsgnewMessage(ORDERTOPIC,msgBody);调用sendMessageInTransaction发送事务消息SendResultsendResultproducer。sendMessageInTransaction(msg,null);System。out。printf(sendResult。toString());Thread。sleep(10);}catch(MQClientExceptionUnsupportedEncodingExceptione){e。printStackTrace();}for(inti0;i100000;i){Thread。sleep(1000);}producer。shutdown();}} 事务的执行流程:在订单服务下单后,向Borker发送生成订单的事务消息,投递到ORDERTOPIC主题中Broker收到事务消息之后,不会直接投递到ORDERTOPIC主题中,而是先放在另外一个主题中,也叫half主题,half主题对消费者不可见half主题加入消息成功之后,会回调事务监听器的的executeLocalTransaction方法,执行本地事务,也就是订单创建,如果创建成功返回COMMIT状态,如果出现异常返回ROLLBACK状态根据上一步的返回状态,进行结束事务的处理提交:从half主题中删除消息,然后将消息投送到ORDERTOPIC主题中,积分服务订阅ORDERTOPIC主题进行消费,生成积分记录回滚:从half主题中删除消息即可如果本地事务返回的执行结果状态由于网络原因或者其他原因未能成功的发送给Broker,Broker未收到事务的执行结果,在补偿机制定时检查half主题中消息的事务执行状态时,会回调事务监听器checkLocalTransaction的接口,进行状态回查,判断订单是否创建成功,然后进行结束事务的处理 使用事务消息不会存在订单创建失败但是消息发送成功的情况,不过你可能还有一个疑问,假如订单创建成功了,消息已经投送到队列中,但是积分服务在消费的时候失败了,这样数据还是处于不一致的状态,个人感觉,积分服务可以在失败的时候进行重试或者进行一些其他的补偿机制来保证积分记录成功的生成,在极端情况下积分记录依旧没有生成,此时可能就要人工接入处理了。RocketMQ事务实现原理 RocketMQ在4。3。0版中开始支持事务消息,它使用两阶段提交协议实现事务消息,同时增加补偿机制定时对事务的状态进行回查,来处理未提交回滚的事务。 两阶段提交 发送事务消息分为两个阶段: 第一阶段:生产者向Broker发送half(prepare)消息,生产者发送事务消息的时候,消息不会直接存入对应的主题中,而是先将消息存入RMQSYSTRANSHALFTOPIC主题中,此时消息对消费者不可见,不能被消费者消费,称为half消息,half消息发送成功之后,开始执行本地事务。 第二阶段:提交阶段,根据第一阶段的本地事务执行结果来决定是提交事务还是回滚事务,提交或者回滚的事务会从RMQSYSTRANSHALFTOPIC中删除,对于提交的事务消息,会将消息投送到实际的主题队列中,之后消费者可以从队列中拉取到消息进行消费,对于回滚的事务消息,直接从RMQSYSTRANSHALFTOPIC主题中删除即可。 注意:由于RocketMQ追加写的性能并不会直接从RMQSYSTRANSHALFTOPIC队列中删除消息,而是使用了另外一个队列,将已提交或者回滚的事务放入到OP队列中,在补偿机制对half消息进行检查的时候会从OP中判断是消息是否已经提交或者回滚。 补偿机制 两阶段提交事务的过程中,任一阶段出现异常都有可能导致事务未能成功的进行提交回滚,所以需要增加一种补偿机制,定时对RMQSYSTRANSHALFTOPIC主题中的half消息进行处理。 RocketMQ使用了一种回查机制,在处理half消息时,对该消息的本地事务执行状态进行回查,根据回查结果决定是否需要提交回滚,或者是等待下一次回查。 接下来就从源码的角度研究一下事务的实现原理。 上面可知,发送事务消息调用的是TransactionMQProducer的sendMessageInTransaction方法:publicclassTransactionMQProducerextendsDefaultMQProducer{OverridepublicTransactionSendResultsendMessageInTransaction(finalMessagemsg,finalObjectarg)throwsMQClientException{if(nullthis。transactionListener){thrownewMQClientException(TransactionListenerisnull,null);}设置主题msg。setTopic(NamespaceUtil。wrapNamespace(this。getNamespace(),msg。getTopic()));发送事务消息returnthis。defaultMQProducerImpl。sendMessageInTransaction(msg,null,arg);}} sendMessageInTransaction在DefaultMQProducerImpl中实现,主要有以下几个步骤:获取事务监听器TransactionListener,如果获取为空或者本地事务执行器LocalTransactionExecuter为空将抛出异常,因为需要通过TransactionListener或者LocalTransactionExecuter来执行本地事务,所以不能为空在消息中设置prepared属性,此时与普通消息(非事务消息)相比多了PROPERTYTRANSACTIONPREPARED属性调用send方法发送prepared消息也就是half消息,发送消息的流程与普通消息一致根据消息的发送结果判断:如果发送成功执行本地事务,并返回本地事务执行结果状态,如果返回的执行状态结果为空,将本地事务状态设置为UNKNOW发送成功之外的其他情况,包括FLUSHDISKTIMEOUT刷盘超时、FLUSHSLAVETIMEOUT和SLAVENOTAVAILABLE从节点不可用三种情况,此时意味着half消息发送失败,本地事务状态置为ROLLBACKMESSAGE回滚消息调用endTransaction方法结束事务publicclassDefaultMQProducerImplimplementsMQProducerInner{发送事务消息publicTransactionSendResultsendMessageInTransaction(finalMessagemsg,finalLocalTransactionExecuterlocalTransactionExecuter,finalObjectarg)throwsMQClientException{获取事务监听器TransactionListenertransactionListenergetCheckListener();如果本地事务执行器或者监听为空if(nulllocalTransactionExecuternulltransactionListener){thrownewMQClientException(tranExecutorisnull,null);}。。。SendResultsendR设置prepared属性MessageAccessor。putProperty(msg,MessageConst。PROPERTYTRANSACTIONPREPARED,true);设置生产者组MessageAccessor。putProperty(msg,MessageConst。PROPERTYPRODUCERGROUP,this。defaultMQProducer。getProducerGroup());try{发送消息sendResultthis。send(msg);}catch(Exceptione){thrownewMQClientException(sendmessageException,e);}本地事务状态LocalTransactionStatelocalTransactionStateLocalTransactionState。UNKNOW;ThrowablelocalEswitch(sendResult。getSendStatus()){判断消息发送状态caseSENDOK:{如果发送成功try{。。。if(null!localTransactionExecuter){如果本地事务执行器不为空执行本地事务localTransactionStatelocalTransactionExecuter。executeLocalTransactionBranch(msg,arg);}elseif(transactionListener!null){如果事务监听器不为空log。debug(UsednewtransactionAPI);执行本地事务localTransactionStatetransactionListener。executeLocalTransaction(msg,arg);}if(nulllocalTransactionState){如果本地事务状态为空,设置为UNKNOWlocalTransactionStateLocalTransactionState。UNKNOW;}。。。}catch(Throwablee){log。info(executeLocalTransactionBranchexception,e);log。info(msg。toString());localE}}caseFLUSHDISKTIMEOUT:caseFLUSHSLAVETIMEOUT:caseSLAVENOTAVAILABLE:localTransactionStateLocalTransactionState。ROLLBACKMESSAGE;本地事务状态设置为回滚default:}try{结束事务this。endTransaction(msg,sendResult,localTransactionState,localException);}catch(Exceptione){log。warn(localtransactionexecutelocalTransactionState,butendbrokertransactionfailed,e);}。。。returntransactionSendR}}half消息处理 Broker对消息发送请求的处理在SendMessageProcessor中,当Broker收到消息后,判断消息是否含有PROPERTYTRANSACTIONPREPARED属性,如果含有prepared属性,会获取TransactionalMessageService,然后调用asyncPrepareMessage对消息进行处理:publicclassSendMessageProcessorextendsAbstractSendMessageProcessorimplementsNettyRequestProcessor{privateCompletableFutureRemotingCommandasyncSendMessage(ChannelHandlerContextctx,RemotingCommandrequest,SendMessageContextmqtraceContext,SendMessageRequestHeaderrequestHeader){finalRemotingCommandresponsepreSend(ctx,request,requestHeader);finalSendMessageResponseHeaderresponseHeader(SendMessageResponseHeader)response。readCustomHeader();。。。CompletableFuturePutMessageResultputMessageR获取prepared属性标记StringtransFlagorigProps。get(MessageConst。PROPERTYTRANSACTIONPREPARED);如果事务标记不为空if(transFlag!nullBoolean。parseBoolean(transFlag)){if(this。brokerController。getBrokerConfig()。isRejectTransactionMessage()){response。setCode(ResponseCode。NOPERMISSION);response。setRemark(thebroker〔this。brokerController。getBrokerConfig()。getBrokerIP1()〕sendingtransactionmessageisforbidden);returnCompletableFuture。completedFuture(response);}事务消息持久化putMessageResultthis。brokerController。getTransactionalMessageService()。asyncPrepareMessage(msgInner);}else{普通消息持久化putMessageResultthis。brokerController。getMessageStore()。asyncPutMessage(msgInner);}returnhandlePutMessageResultFuture(putMessageResult,response,request,msgInner,responseHeader,mqtraceContext,ctx,queueIdInt);}} TransactionalMessageServiceImpl的asyncPrepareMessage方法中,又调用了TransactionalMessageBridge的asyncPutHalfMessage方法,添加half消息:publicclassTransactionalMessageServiceImplimplementsTransactionalMessageService{OverridepublicCompletableFuturePutMessageResultasyncPrepareMessage(MessageExtBrokerInnermessageInner){添加half消息returntransactionalMessageBridge。asyncPutHalfMessage(messageInner);}} 在TransactionalMessageBridge的asyncPutHalfMessage方法中,调用了parseHalfMessageInner方法设置half消息的相关属性。 因为是half消息,此时还不能直接加入到实际的消息队列中,否则一旦加入就会被消费者消费,所以需要先对half消息暂存,等收到消息提交请求时才可以添加到实际的消息队列中,RocketMQ设置了一个RMQSYSTRANSHALFTOPIC主题来暂存half消息。 在parseHalfMessageInner方法中,会对消息进行如下处理:设置消息实际的主题和队列ID,待收到事务提交请求后恢复实际的主题和队列ID,向实际的队列中添加消息更改消息的主题为half消息主题RMQSYSTRANSHALFTOPIC,先将消息投送到half消息队列中half主题对应的消息队列ID为0,所以更改消息的队列ID为0 之后调用asyncPutMessage添加消息,接下来的流程就和普通消息的添加一致了,具体可参考【RocketMQ】消息的存储:publicclassTransactionalMessageBridge{publicCompletableFuturePutMessageResultasyncPutHalfMessage(MessageExtBrokerInnermessageInner){添加消息returnstore。asyncPutMessage(parseHalfMessageInner(messageInner));}privateMessageExtBrokerInnerparseHalfMessageInner(MessageExtBrokerInnermsgInner){设置实际的主题MessageAccessor。putProperty(msgInner,MessageConst。PROPERTYREALTOPIC,msgInner。getTopic());设置实际的队列IDMessageAccessor。putProperty(msgInner,MessageConst。PROPERTYREALQUEUEID,String。valueOf(msgInner。getQueueId()));msgInner。setSysFlag(MessageSysFlag。resetTransactionValue(msgInner。getSysFlag(),MessageSysFlag。TRANSACTIONNOTTYPE));设置事务主题RMQSYSTRANSHALFTOPICmsgInner。setTopic(TransactionalMessageUtil。buildHalfTopic());设置事务队列IDmsgInner。setQueueId(0);msgInner。setPropertiesString(MessageDecoder。messageProperties2String(msgInner。getProperties()));returnmsgI}}publicclassTransactionalMessageUtil{publicstaticStringbuildHalfTopic(){half消息主题returnTopicValidator。RMQSYSTRANSHALFTOPIC;}}结束事务 在进行了half消息发送和执行本地事务的操作后,消息暂存在Broker的half主题中,接下来生产者需要根据本地事务的执行结果,向Broker发送结束事务的请求,结束事务的方法endTransaction在DefaultMQProducerImpl中实现:构建结束事务的请求头EndTransactionRequestHeader判断本地事务执行状态:COMMITMESSAGE:表示提交事务,结束事务的请求头中设置TRANSACTIONCOMMITTYPE标识进行事务提交ROLLBACKMESSAGE:表示回滚事务,请求头中设置TRANSACTIONROLLBACKTYPE标识进行事务回滚UNKNOW:事务执行结果未知状态,请求头中设置TRANSACTIONNOTTYPE标识未知状态的事务调用endTransactionOneway向Broker发送结束事务的请求publicclassDefaultMQProducerImplimplementsMQProducerInner{publicvoidendTransaction(finalMessagemsg,finalSendResultsendResult,finalLocalTransactionStatelocalTransactionState,finalThrowablelocalException)throwsRemotingException,MQBrokerException,InterruptedException,UnknownHostException{消息finalMessageIif(sendResult。getOffsetMsgId()!null){idMessageDecoder。decodeMessageId(sendResult。getOffsetMsgId());}else{idMessageDecoder。decodeMessageId(sendResult。getMsgId());}获取事务IDStringtransactionIdsendResult。getTransactionId();获取Broker地址finalStringbrokerAddrthis。mQClientFactory。findBrokerAddressInPublish(sendResult。getMessageQueue()。getBrokerName());结束事务请求头EndTransactionRequestHeaderrequestHeadernewEndTransactionRequestHeader();设置事务IDrequestHeader。setTransactionId(transactionId);requestHeader。setCommitLogOffset(id。getOffset());判断本地事务状态switch(localTransactionState){caseCOMMITMESSAGE:如果提交requestHeader。setCommitOrRollback(MessageSysFlag。TRANSACTIONCOMMITTYPE);caseROLLBACKMESSAGE:如果是回滚requestHeader。setCommitOrRollback(MessageSysFlag。TRANSACTIONROLLBACKTYPE);caseUNKNOW:未知requestHeader。setCommitOrRollback(MessageSysFlag。TRANSACTIONNOTTYPE);default:}doExecuteEndTransactionHook(msg,sendResult。getMsgId(),brokerAddr,localTransactionState,false);requestHeader。setProducerGroup(this。defaultMQProducer。getProducerGroup());requestHeader。setTranStateTableOffset(sendResult。getQueueOffset());requestHeader。setMsgId(sendResult。getMsgId());StringremarklocalException!null?(executeLocalTransactionBranchexception:localException。toString()):发送结束事务的请求this。mQClientFactory。getMQClientAPIImpl()。endTransactionOneway(brokerAddr,requestHeader,remark,this。defaultMQProducer。getSendMsgTimeout());}}Broker事务结束请求处理 Broker对事务结束的请求处理在EndTransactionProcessor中:判断是否是从节点,从节点没有结束事务的权限,如果是从节点返回SLAVENOTAVAILABLE从请求头中获取事务的提交类型:TRANSACTIONCOMMITTYPE:表示提交事务,会调用commitMessage方法提交消息,如果提交成功调用endMessageTransaction结束事务,恢复消息的原始主题和队列并调用deletePrepareMessage方法删掉half消息TRANSACTIONROLLBACKTYPE:表示回滚事务,会调用rollbackMessage方法回滚事务,然后删掉half消息publicclassEndTransactionProcessorextendsAsyncNettyRequestProcessorimplementsNettyRequestProcessor{OverridepublicRemotingCommandprocessRequest(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{创建响应finalRemotingCommandresponseRemotingCommand。createResponseCommand(null);finalEndTransactionRequestHeaderrequestHeader(EndTransactionRequestHeader)request。decodeCommandCustomHeader(EndTransactionRequestHeader。class);LOGGER。debug(Transactionrequest:{},requestHeader);如果是从节点,从节点没有结束事务的权限,返回SLAVENOTAVAILABLEif(BrokerRole。SLAVEbrokerController。getMessageStoreConfig()。getBrokerRole()){response。setCode(ResponseCode。SLAVENOTAVAILABLE);LOGGER。warn(Messagestoreisslavemode,soendtransactionisforbidden。);}。。。OperationResultresultnewOperationResult();判断事务提交类型,如果是提交事务if(MessageSysFlag。TRANSACTIONCOMMITTYPErequestHeader。getCommitOrRollback()){提交消息resultthis。brokerController。getTransactionalMessageService()。commitMessage(requestHeader);if(result。getResponseCode()ResponseCode。SUCCESS){校验Prepare消息RemotingCommandrescheckPrepareMessage(result。getPrepareMessage(),requestHeader);if(res。getCode()ResponseCode。SUCCESS){结束事务,恢复消息的原始主题和队列MessageExtBrokerInnermsgInnerendMessageTransaction(result。getPrepareMessage());msgInner。setSysFlag(MessageSysFlag。resetTransactionValue(msgInner。getSysFlag(),requestHeader。getCommitOrRollback()));msgInner。setQueueOffset(requestHeader。getTranStateTableOffset());msgInner。setPreparedTransactionOffset(requestHeader。getCommitLogOffset());msgInner。setStoreTimestamp(result。getPrepareMessage()。getStoreTimestamp());MessageAccessor。clearProperty(msgInner,MessageConst。PROPERTYTRANSACTIONPREPARED);RemotingCommandsendResultsendFinalMessage(msgInner);if(sendResult。getCode()ResponseCode。SUCCESS){删除half消息this。brokerController。getTransactionalMessageService()。deletePrepareMessage(result。getPrepareMessage());}returnsendR}}}elseif(MessageSysFlag。TRANSACTIONROLLBACKTYPErequestHeader。getCommitOrRollback()){如果是回滚回滚消息resultthis。brokerController。getTransactionalMessageService()。rollbackMessage(requestHeader);if(result。getResponseCode()ResponseCode。SUCCESS){RemotingCommandrescheckPrepareMessage(result。getPrepareMessage(),requestHeader);if(res。getCode()ResponseCode。SUCCESS){删除half消息this。brokerController。getTransactionalMessageService()。deletePrepareMessage(result。getPrepareMessage());}}}response。setCode(result。getResponseCode());response。setRemark(result。getResponseRemark());}}删除half消息 由于CommitLog追加写的性质,RocketMQ并不会直接将half消息从CommitLog中删除,而是使用了另外一个OP主题RMQSYSTRANSOPHALFTOPIC(以下简称OP主题队列),将已经提交回滚的消息记录在OP主题队列中:publicclassTransactionalMessageServiceImplimplementsTransactionalMessageService{OverridepublicbooleandeletePrepareMessage(MessageExtmsgExt){添加到OP消息队列if(this。transactionalMessageBridge。putOpMessage(msgExt,TransactionalMessageUtil。REMOVETAG)){log。debug(Transactionopmessagewritesuccessfully。messageId{},queueId{}msgExt:{},msgExt。getMsgId(),msgExt。getQueueId(),msgExt);}else{log。error(Transactionopmessagewritefailed。messageIdis{},queueIdis{},msgExt。getMsgId(),msgExt。getQueueId());}}} putOpMessage方法在TransactionalMessageBridge中实现,它又调用了addRemoveTagInTransactionOp方法向OP队列中添加消息:构建OP消息,主要是创建Message对象,然后设置主题为RMQSYSTRANSOPHALFTOPIC,设置half消息在队列的偏移量调用writeOp方法将消息写入OP队列,makeOpMessageInner方法用于构建消息体,然后调用putMessage放将消息写入CommitLogpublicclassTransactionalMessageBridge{privatefinalConcurrentHashMapMessageQueue,MessageQueueopQueueMapnewConcurrentHashMap();publicbooleanputOpMessage(MessageExtmessageExt,StringopType){构建消息队列,设置消息所属主题、Broker名称、队列ID信息MessageQueuemessageQueuenewMessageQueue(messageExt。getTopic(),this。brokerController。getBrokerConfig()。getBrokerName(),messageExt。getQueueId());if(TransactionalMessageUtil。REMOVETAG。equals(opType)){添加OP消息returnaddRemoveTagInTransactionOp(messageExt,messageQueue);}}当事务消息进行提交或者回滚时,记录在operation队列中(OP队列)privatebooleanaddRemoveTagInTransactionOp(MessageExtprepareMessage,MessageQueuemessageQueue){构建OP消息,主题为RMQSYSTRANSOPHALFTOPICMessagemessagenewMessage(TransactionalMessageUtil。buildOpTopic(),TransactionalMessageUtil。REMOVETAG,String。valueOf(prepareMessage。getQueueOffset())。getBytes(TransactionalMessageUtil。charset));将消息写入OP队列writeOp(message,messageQueue);}privatevoidwriteOp(Messagemessage,MessageQueuemq){MessageQueueopQ如果已经添加过if(opQueueMap。containsKey(mq)){opQueueopQueueMap。get(mq);}else{opQueuegetOpQueueByHalf(mq);MessageQueueoldQueueopQueueMap。putIfAbsent(mq,opQueue);if(oldQueue!null){opQueueoldQ}}如果为空if(opQueuenull){创建opQueuenewMessageQueue(TransactionalMessageUtil。buildOpTopic(),mq。getBrokerName(),mq。getQueueId());}构建OP消息添加到OP队列中putMessage(makeOpMessageInner(message,opQueue));}}事务状态检查 由于各种原因有可能未成功收到提交回滚事务的请求,所以RocketMQ需要定期检查half消息,检查事务的执行结果,TransactionalMessageCheckService用于half消息状态的检查,它实现了ServiceThread,默认可以看到在onWaitEnd方法中调用了check方法进行状态检查:publicclassTransactionalMessageCheckServiceextendsServiceThread{OverrideprotectedvoidonWaitEnd(){longtimeoutbrokerController。getBrokerConfig()。getTransactionTimeOut();intcheckMaxbrokerController。getBrokerConfig()。getTransactionCheckMax();longbeginSystem。currentTimeMillis();log。info(Begintocheckpreparemessage,begintime:{},begin);状态检查this。brokerController。getTransactionalMessageService()。check(timeout,checkMax,this。brokerController。getTransactionalMessageCheckListener());log。info(Endtocheckpreparemessage,consumedtime:{},System。currentTimeMillis()begin);}} check方法在TransactionalMessageServiceImpl中实现:publicclassTransactionalMessageServiceImplimplementsTransactionalMessageService{Overridepublicvoidcheck(longtransactionTimeout,inttransactionCheckMax,AbstractTransactionalMessageCheckListenerlistener){try{StringtopicTopicValidator。RMQSYSTRANSHALFTOPIC;根据主题获取消息队列SetMessageQueuemsgQueuestransactionalMessageBridge。fetchMessageQueues(topic);if(msgQueuesnullmsgQueues。size()0){log。warn(Thequeueoftopicisempty:topic);}log。debug(Checktopic{},queues{},topic,msgQueues);遍历所有的消息队列for(MessageQueuemessageQueue:msgQueues){获取当前时间做为开始时间longstartTimeSystem。currentTimeMillis();获取对应的OP消息队列MessageQueueopQueuegetOpQueue(messageQueue);获取half消息队列的消费进度longhalfOffsettransactionalMessageBridge。fetchConsumeOffset(messageQueue);获取op消息队列的消费进度longopOffsettransactionalMessageBridge。fetchConsumeOffset(opQueue);log。info(Beforecheck,thequeue{}msgOffset{}opOffset{},messageQueue,halfOffset,opOffset);如果消费进度小于0表示不合法if(halfOffset0opOffset0){log。error(MessageQueue:{}illegaloffsetread:{},opoffset:{},skipthisqueue,messageQueue,halfOffset,opOffset);}存储已处理的消息ListLongdoneOpOffsetnewArrayList();HashMapLong,LongremoveMapnewHashMap();根据当前的消费进度从已处理队列中拉取消息PullResultpullResultfillOpRemoveMap(removeMap,opQueue,opOffset,halfOffset,doneOpOffset);如果拉取消息为空,打印错误继续处理下一个消息队列if(nullpullResult){log。error(Thequeue{}checkmsgOffset{}withopOffset{}failed,pullResultisnull,messageQueue,halfOffset,opOffset);}获取消息为空的数量默认为1intgetMessageNullCount1;新的进度longnewOffsethalfO获取half队列的消费进度,赋值给ilongihalfOwhile(true){如果当前时间减去开始时间大于最大处理时间限制,终止循环if(System。currentTimeMillis()startTimeMAXPROCESSTIMELIMIT){log。info(Queue{}processtimereachmax{},messageQueue,MAXPROCESSTIMELIMIT);}如果OP队列中包含当前偏移量,表示消息已经被处理,加入到已处理集合中if(removeMap。containsKey(i)){log。debug(Halfoffset{}hasbeencommittedrolledback,i);LongremovedOpOffsetremoveMap。remove(i);加入到doneOpOffset集合中doneOpOffset。add(removedOpOffset);}else{如果已处理队列中不包含当前消息根据偏移量从half队列获取half消息GetResultgetResultgetHalfMsg(messageQueue,i);获取消息对象MessageExtmsgExtgetResult。getMsg();如果获取消息为空if(msgExtnull){判断获取空消息的次数是否大于MAXRETRYCOUNTWHENHALFNULLif(getMessageNullCountMAXRETRYCOUNTWHENHALFNULL){}判断从half队列获取消息的结果是NONEWMSG,表示没有消息,此时终止循环等待下一次进行检查if(getResult。getPullResult()。getPullStatus()PullStatus。NONEWMSG){log。debug(Nonewmsg,themissoffset{}in{},continuecheck{},pullresult{},i,messageQueue,getMessageNullCount,getResult。getPullResult());}else{log。info(Illegaloffset,themissoffset{}in{},continuecheck{},pullresult{},i,messageQueue,getMessageNullCount,getResult。getPullResult());走到这里说明消息的偏移量不合法,继续获取下一条消息进行处理igetResult。getPullResult()。getNextBeginOffset();newO}}是否需要丢弃消息或者需要跳过消息if(needDiscard(msgExt,transactionCheckMax)needSkip(msgExt)){listener。resolveDiscardMsg(msgExt);继续处理下一条消息newOffseti1;i;}如果消息的添加时间是否大于等于本次检查的开始时间,说明是在检查开始之后加入的消息,暂不进行处理if(msgExt。getStoreTimestamp()startTime){log。debug(Freshstored。themissoffset{},checkitlater,store{},i,newDate(msgExt。getStoreTimestamp()));}计算half消息在队列中的保留时间:当前时间减去消息加入的时间longvalueOfCurrentMinusBornSystem。currentTimeMillis()msgExt。getBornTimestamp();事务超时时间longcheckImmunityTimetransactionT获取PROPERTYCHECKIMMUNITYTIMEINSECONDS属性,表示事务回查最晚的时间StringcheckImmunityTimeStrmsgExt。getUserProperty(MessageConst。PROPERTYCHECKIMMUNITYTIMEINSECONDS);如果PROPERTYCHECKIMMUNITYTIMEINSECONDS属性不为空if(null!checkImmunityTimeStr){获取事务回查最晚检查时间,如果checkImmunityTimeStr为1则返回事务超时时间,否则返回checkImmunityTimeStr转为long后乘以1000得到的值checkImmunityTimegetImmunityTime(checkImmunityTimeStr,transactionTimeout);如果消息的保留时间小于事务回查最晚检查时间if(valueOfCurrentMinusBorncheckImmunityTime){检查half消息在队列中的偏移量,如果返回true跳过本条消息if(checkPrepareQueueOffset(removeMap,doneOpOffset,msgExt)){处理下一个消息newOffseti1;i;}}}else{如果valueOfCurrentMinusBorn小于checkImmunityTimeif((0valueOfCurrentMinusBorn)(valueOfCurrentMinusBorncheckImmunityTime)){log。debug(Newarrived,themissoffset{},checkitlatercheckImmunity{},born{},i,checkImmunityTime,newDate(msgExt。getBornTimestamp()));}}获取OP消息ListMessageExtopMsgpullResult。getMsgFoundList();判断是否需要检查,满足检查的条件为以下三种情况之一:1。拉取消息为空并且消息的保留时间已经大于事务设置的最晚回查时间2。拉取消息不为空并且拉取到的最后一条消息的存入时间减去当前时间超过了事务的超时时间3。half消息存留时间为负数booleanisNeedCheck(opMsgnullvalueOfCurrentMinusBorncheckImmunityTime)(opMsg!null(opMsg。get(opMsg。size()1)。getBornTimestamp()startTimetransactionTimeout))(valueOfCurrentMinusBorn1);如果需要进行回查if(isNeedCheck){将half消息重新加入到队列中if(!putBackHalfMsgQueue(msgExt,i)){}发送回查请求listener。resolveHalfMsg(msgExt);}else{继续从OP队列中拉取消息pullResultfillOpRemoveMap(removeMap,opQueue,pullResult。getNextBeginOffset(),halfOffset,doneOpOffset);log。debug(Themissoffset:{}inmessageQueue:{}needtogetmoreopMsg,resultis:{},i,messageQueue,pullResult);}}加1继续处理下一条消息newOffseti1;i;}if(newOffset!halfOffset){更新消费进度transactionalMessageBridge。updateConsumeOffset(messageQueue,newOffset);}longnewOpOffsetcalculateOpOffset(doneOpOffset,opOffset);if(newOpOffset!opOffset){更新处理进度transactionalMessageBridge。updateConsumeOffset(opQueue,newOpOffset);}}}catch(Throwablee){log。error(Checkerror,e);}}} 在check方法中会获取half主题(RMQSYSTRANSHALFTOPIC)下的所有消息队列,遍历所有的half消息队列,对队列中的half消息进行处理,主要步骤如下。 一、构建OP队列的消息队列对象MessageQueue 调用getOpQueue获取当前half消息队列对应的OP队列的MessageQueue对象,实际上是创建了一个MessageQueue对象,设置为OP队列的主题、以及Broker名称和队列的ID,在后面获取消费进度时使用:privateMessageQueuegetOpQueue(MessageQueuemessageQueue){获取OP消息队列MessageQueueopQueueopQueueMap。get(messageQueue);if(opQueuenull){如果获取为空,则创建MessageQueue,主题设置为OPTOPIC,设置Broker名称和队列IDopQueuenewMessageQueue(TransactionalMessageUtil。buildOpTopic(),messageQueue。getBrokerName(),messageQueue。getQueueId());加入到opQueueMap中opQueueMap。put(messageQueue,opQueue);}returnopQ} 二、获取half队列的消费进度和OP消费队列的消费进度 消费进度的获取是通过调用transactionalMessageBridge的fetchConsumeOffset方法进行查询的,可以看到方法的入参是MessageQueue类型的,所以第一步需要构造OP队列的MessageQueue对象,在这一步查询消费进度使用:publiclongfetchConsumeOffset(MessageQueuemq){longoffsetbrokerController。getConsumerOffsetManager()。queryOffset(TransactionalMessageUtil。buildConsumerGroup(),mq。getTopic(),mq。getQueueId());if(offset1){offsetstore。getMinOffsetInQueue(mq。getTopic(),mq。getQueueId());}} 三、从OP队列中拉取消息 调用fillOpRemoveMap方法根据消费进度信息从OP队列中拉取消息,将拉取的消费放入removeMap中,用于判断half消息是否已经处理:privatePullResultfillOpRemoveMap(HashMapLong,LongremoveMap,MessageQueueopQueue,longpullOffsetOfOp,longminiOffset,ListLongdoneOpOffset){从OP队列中拉取消息,每次拉取32条PullResultpullResultpullOpMsg(opQueue,pullOffsetOfOp,32);如果拉取为空返回nullif(nullpullResult){}如果拉取状态为消费进度不合法或者没有匹配的消息if(pullResult。getPullStatus()PullStatus。OFFSETILLEGALpullResult。getPullStatus()PullStatus。NOMATCHEDMSG){log。warn(Themissopoffset{}inqueue{}isillegal,pullResult{},pullOffsetOfOp,opQueue,pullResult);从拉取结果中获取消费进度并更新transactionalMessageBridge。updateConsumeOffset(opQueue,pullResult。getNextBeginOffset());returnpullR}elseif(pullResult。getPullStatus()PullStatus。NONEWMSG){如果没有消息log。warn(Themissopoffset{}inqueue{}isNONEWMSG,pullResult{},pullOffsetOfOp,opQueue,pullResult);returnpullR}获取拉取到的消息ListMessageExtopMsgpullResult。getMsgFoundList();if(opMsgnull){如果为空打印日志log。warn(Themissopoffset{}inqueue{}isempty,pullResult{},pullOffsetOfOp,opQueue,pullResult);returnpullR}遍历拉取的消息for(MessageExtopMessageExt:opMsg){获取队列中的偏移量LongqueueOffsetgetLong(newString(opMessageExt。getBody(),TransactionalMessageUtil。charset));log。debug(Topic:{}tags:{},OpOffset:{},HalfOffset:{},opMessageExt。getTopic(),opMessageExt。getTags(),opMessageExt。getQueueOffset(),queueOffset);if(TransactionalMessageUtil。REMOVETAG。equals(opMessageExt。getTags())){如果偏移量小于最小的偏移量if(queueOffsetminiOffset){加入到doneOpOffset中doneOpOffset。add(opMessageExt。getQueueOffset());}else{加入到已处理消息的集合removeMap中removeMap。put(queueOffset,opMessageExt。getQueueOffset());}}else{log。error(FoundaillegaltaginopMessageExt{},opMessageExt);}}log。debug(Removemap:{},removeMap);log。debug(Doneoplist:{},doneOpOffset);returnpullR} 四、处理每一个half消息 开启while循环,从half队列的消费进度处开始,处理每一个half消息:如果当前时间减去检查开始时间大于最大处理时间,此时终止循环如果removeMap中包含当前half消息,表示消息已经被处理,放入到已处理消息集合中doneOpOffset如果removeMap不包含当前half消息,调用getHalfMsg方法根据偏移量从half队列获取half消息,如果消息获取不为空继续下一步,否则进行如下处理判断获取空消息的个数是否大于MAXRETRYCOUNTWHENHALFNULL,如果大于将终止本次循环,处理下一个half消息队列判断拉取消息的状态是否为NONEWMSG,如果是表示队列中没有消息,先终止循环如果拉取消息的状态是不是NONEWMSG,表示消费进度不合法,获取half消息队列中下一条消息进行处理调用needDiscard判断是否需要丢弃half消息,或者调用needSkip判断是否需要跳过当前half消息:needDiscard是根据half消息的检查次数是否超过最大限制来决定是否丢弃half消息privatebooleanneedDiscard(MessageExtmsgExt,inttransactionCheckMax){从属性中获取检查次数StringcheckTimesmsgExt。getProperty(MessageConst。PROPERTYTRANSACTIONCHECKTIMES);intcheckTime1;如果不为空if(null!checkTimes){checkTimegetInt(checkTimes);如果检查次数大于事务最大的检查次数,表示需要丢弃if(checkTimetransactionCheckMax){}else{检查次数加一checkT}}更新检查次数msgExt。putUserProperty(MessageConst。PROPERTYTRANSACTIONCHECKTIMES,String。valueOf(checkTime));}needSkip是根据half消息在队列中的存留时间是否超过了最大的保留时间限制来决定是否跳过privatebooleanneedSkip(MessageExtmsgExt){计算half消息在队列中的保留时间longvalueOfCurrentMinusBornSystem。currentTimeMillis()msgExt。getBornTimestamp();如果大于Broker中设置的最大保留时间,表示需要跳过if(valueOfCurrentMinusBorntransactionalMessageBridge。getBrokerController()。getMessageStoreConfig()。getFileReservedTime()3600L1000){log。info(Halfmessageexceedfilereservedtime,soskipit。messageId{},bornTime{},msgExt。getMsgId(),msgExt。getBornTimestamp());}}判断消息的的存入时间是否大于本次开始检查的时间,如果大于说明是新加入的消息,由于事务消息发送后不会立刻提交,所以此时暂不需要进行检查,中断循环即可计算half消息在队列中的存留时间valueOfCurrentMinusBorn:当前时间消息存入的时间设置立刻回查事务状态的时间checkImmunityTime:事务的超时时间从消息属性中获取PROPERTYCHECKIMMUNITYTIMEINSECONDS属性的值放在checkImmunityTimeStr变量中,表示事务的最晚回查时间:(1)如果checkImmunityTimeStr获取不为空,调用getImmunityTime方法计算事务立刻回查时间,并赋值给checkImmunityTime,从代码中可以看出如果checkImmunityTimeStr为1则返回事务的超时时间,否则返回checkImmunityTimeStr的值并乘以1000转为秒:privatelonggetImmunityTime(StringcheckImmunityTimeStr,longtransactionTimeout){longcheckImmunityT转为longcheckImmunityTimegetLong(checkImmunityTimeStr);if(1checkImmunityTime){如果为1,使用事务的超时时间checkImmunityTimetransactionT}else{checkImmunityTime1000;使用checkImmunityTime,乘以1000转为秒}returncheckImmunityT}计算完checkImmunityTime的值后,判断valueOfCurrentMinusBorn是否小于checkImmunityTime,如果是表明还未到事务的超时时间,此时调用checkPrepareQueueOffset检查half消息在队列中的偏移量,根据检查结果判断是否需要跳过当前消息:如果PROPERTYTRANSACTIONPREPAREDQUEUEOFFSET属性获取为空,调用putImmunityMsgBackToHalfQueue将消息重新加入half队列,如果返回true表示加入成功,此时向前推荐消费进度,处理下一条消息,如果加入失败会继续循环处理本条消息(因为进度未向前推进)如果PROPERTYTRANSACTIONPREPAREDQUEUEOFFSET属性获取不为空,转为long型,判断OP队列中是否已经包含当前消息的偏移量,如果包含加入到doneOpOffset中并返回true,此时向前推进消费进度,处理下一条消息,否则同样调用putImmunityMsgBackToHalfQueue将消息重新加入half队列,并根据加入成功与否判断是否继续处理下一条消息总结如果事务设置了PROPERTYCHECKIMMUNITYTIMEINSECONDS属性,并且half消息的存留时间小于立刻检查事务的时间,说明还未到时间不需要进行状态检查,此时获取消息在half队列的偏移量,如果获取为空,将消息重新加入到half队列中,如果获取不为空判断是否已经在OP处理队列中,如果返回true处理下一个消息即可,否则同样将消息重新加入half队列中。RocketMQ在事务未到最晚回查时间时将消息重新加入了half消息队列,因为加入之后half队列的消费进度会往前推进并在回查结束时更新进度,所以下次检查时并不会检查到旧的half消息。privatebooleancheckPrepareQueueOffset(HashMapLong,LongremoveMap,ListdoneOpOffset,MessageExtmsgExt){从属性中获取消息在half队列的偏移量StringprepareQueueOffsetStrmsgExt。getUserProperty(MessageConst。PROPERTYTRANSACTIONPREPAREDQUEUEOFFSET);如果为空,调用putImmunityMsgBackToHalfQueue将消息重新加入half队列if(nullprepareQueueOffsetStr){returnputImmunityMsgBackToHalfQueue(msgExt);}else{转为longlongprepareQueueOffsetgetLong(prepareQueueOffsetStr);如果为1,返回false,等待下次循环进行处理if(1prepareQueueOffset){}else{如果OP队列中已经包含当前消息的偏移量if(removeMap。containsKey(prepareQueueOffset)){longtmpOpOffsetremoveMap。remove(prepareQueueOffset);加入到已完成的消息集合中doneOpOffset。add(tmpOpOffset);}else{将消息重新加入half队列returnputImmunityMsgBackToHalfQueue(msgExt);}}}}(2)如果checkImmunityTimeStr获取为空,判断valueOfCurrentMinusBorn(消息存留时间)是否大于等于0并且小于checkImmunityTime(事务超时时间),如果满足条件表示新加入的消息并且还未过事务的超时时间,此时终止循环暂不进行回查,否则进入下一步判断是否需要进行状态回查isNeedCheck,满足检查的条件为以下三种情况之一:(1)从OP队列中拉取消息为空并且当前half消息的存留时间已经大于事务设置的最晚回查时间opMsgnullvalueOfCurrentMinusBorncheckImmunityTime(2)从OP队列中拉取的消息不为空,并且拉取的最后一条消息的存入时间减去本次开始检查时间大于事务的超时时间opMsg!null(opMsg。get(opMsg。size()1)。getBornTimestamp()startTimetransactionTimeout)(3)half消息在队列中的保留时间小于等于1,说明加入half消息的时间大于本次开始检查的时间valueOfCurrentMinusBorn1根据isNeedCheck判断是否需要回查(1)需要回查:调用putBackHalfMsgQueue将half消息重新加入到队列中,如果加入失败继续循环再次处理,如果加入成功调用resolveHalfMsg发送回查请求(2)不需要回查:调用fillOpRemoveMap继续从OP队列中拉取消息判断更新i的值,继续处理下一个half消息 五、更新消费进度 主要是更half队列和OP队列的消费进度。重新添加half消息 从putBackHalfMsgQueue方法中可以看出将消息重新加入到了half队列:privatebooleanputBackHalfMsgQueue(MessageExtmsgExt,longoffset){重新将消息入到half消息队列中PutMessageResultputMessageResultputBackToHalfQueueReturnResult(msgExt);如果加入成功if(putMessageResult!nullputMessageResult。getPutMessageStatus()PutMessageStatus。PUTOK){设置消息的逻辑偏移量msgExt。setQueueOffset(putMessageResult。getAppendMessageResult()。getLogicsOffset());设置消息在CommitLog的偏移量msgExt。setCommitLogOffset(putMessageResult。getAppendMessageResult()。getWroteOffset());设消息IDmsgExt。setMsgId(putMessageResult。getAppendMessageResult()。getMsgId());log。debug(Sendcheckmessage,theoffset{}restoredinqueueOffset{}commitLogOffset{}newMsgId{}realMsgId{}topic{},offset,msgExt。getQueueOffset(),msgExt。getCommitLogOffset(),msgExt。getMsgId(),msgExt。getUserProperty(MessageConst。PROPERTYUNIQCLIENTMESSAGEIDKEYIDX),msgExt。getTopic());}else{加入失败log。error(PutBackToHalfQueueReturnResultwritefailed,topic:{},queueId:{},msgId:{},msgExt。getTopic(),msgExt。getQueueId(),msgExt。getMsgId());}}状态回查请求发送 resolveHalfMsg方法中向客户端发送事务状态回查的请求,可以看到是通过线程池异步实现的:publicabstractclassAbstractTransactionalMessageCheckListener{publicvoidresolveHalfMsg(finalMessageExtmsgExt){executorService。execute(newRunnable(){Overridepublicvoidrun(){try{发送状态回查请求sendCheckMessage(msgExt);}catch(Exceptione){LOGGER。error(Sendcheckmessageerror!,e);}}});}} sendCheckMessage方法在AbstractTransactionalMessageCheckListener中实现,主要是构建请求信息,然后向消息的生产者发送事务状态回查的请求:publicabstractclassAbstractTransactionalMessageCheckListener{publicvoidsendCheckMessage(MessageExtmsgExt)throwsException{构建回查请求头CheckTransactionStateRequestHeadercheckTransactionStateRequestHeadernewCheckTransactionStateRequestHeader();checkTransactionStateRequestHeader。setCommitLogOffset(msgExt。getCommitLogOffset());设置Commitlog偏移量checkTransactionStateRequestHeader。setOffsetMsgId(msgExt。getMsgId());checkTransactionStateRequestHeader。setMsgId(msgExt。getUserProperty(MessageConst。PROPERTYUNIQCLIENTMESSAGEIDKEYIDX));checkTransactionStateRequestHeader。setTransactionId(checkTransactionStateRequestHeader。getMsgId());checkTransactionStateRequestHeader。setTranStateTableOffset(msgExt。getQueueOffset());设置消息实际的TOPICmsgExt。setTopic(msgExt。getUserProperty(MessageConst。PROPERTYREALTOPIC));设置消息实际的队列IDmsgExt。setQueueId(Integer。parseInt(msgExt。getUserProperty(MessageConst。PROPERTYREALQUEUEID)));msgExt。setStoreSize(0);StringgroupIdmsgExt。getProperty(MessageConst。PROPERTYPRODUCERGROUP);获取channelChannelchannelbrokerController。getProducerManager()。getAvailableChannel(groupId);if(channel!null){发送回查请求brokerController。getBroker2Client()。checkProducerTransactionState(groupId,channel,checkTransactionStateRequestHeader,msgExt);}else{LOGGER。warn(Checktransactionfailed,channelisnull。groupId{},groupId);}}}事务状态回查请求处理 事务状态回查请求的处理在ClientRemotingProcessor中,如果请求类型是CHECKTRANSACTIONSTATE表示是事务状态回查请求,调用checkTransactionState方法进行事务状态检查:从请求中获取消息,判断消息是否为空,不为空进入下一步从消息属性中获取生产者组名称,如果不为空进入下一步根据生产者组名称获取MQProducerInner对象,然后调用checkTransactionState进行状态检查publicclassClientRemotingProcessorextendsAsyncNettyRequestProcessorimplementsNettyRequestProcessor{OverridepublicRemotingCommandprocessRequest(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{switch(request。getCode()){caseRequestCode。CHECKTRANSACTIONSTATE:检查事务状态returnthis。checkTransactionState(ctx,request);。。。default:}}publicRemotingCommandcheckTransactionState(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{finalCheckTransactionStateRequestHeaderrequestHeader(CheckTransactionStateRequestHeader)request。decodeCommandCustomHeader(CheckTransactionStateRequestHeader。class);finalByteBufferbyteBufferByteBuffer。wrap(request。getBody());获取消息finalMessageExtmessageExtMessageDecoder。decode(byteBuffer);如果消息不为空if(messageExt!null){。。。获取事务IDStringtransactionIdmessageExt。getProperty(MessageConst。PROPERTYUNIQCLIENTMESSAGEIDKEYIDX);if(null!transactionId!。equals(transactionId)){messageExt。setTransactionId(transactionId);}获取生产者组finalStringgroupmessageExt。getProperty(MessageConst。PROPERTYPRODUCERGROUP);if(group!null){获取MQProducerInnerMQProducerInnerproducerthis。mqClientFactory。selectProducer(group);if(producer!null){finalStringaddrRemotingHelper。parseChannelRemoteAddr(ctx。channel());调用checkTransactionState进行状态检查producer。checkTransactionState(addr,messageExt,requestHeader);}else{log。debug(checkTransactionState,pickproducerbygroup〔{}〕failed,group);}}else{log。warn(checkTransactionState,pickproducergroupfailed);}}else{log。warn(checkTransactionState,decodemessagefailed);}}} checkTransactionState方法在DefaultMQProducerImpl中实现,可以看到它创建了Runnable对象,然后提交到线程池中异步执行事务的状态检查,检查的主要逻辑如下:获取TransactionCheckListener(已废弃)类型的事务监听器获取TransactionListener类型的事务监听器如果TransactionCheckListener和TransactionListener其中之一不为空,调用checkLocalTransaction进行状态检查调用processTransactionState处理事务查询结果,这一步主要是根据事务的查询结果构建请求信息,然后调用endTransactionOneway方法向Broker发送结束事务的请求publicclassDefaultMQProducerImplimplementsMQProducerInner{OverridepublicvoidcheckTransactionState(finalStringaddr,finalMessageExtmsg,finalCheckTransactionStateRequestHeaderheader){RunnablerequestnewRunnable(){。。。Overridepublicvoidrun(){获取TransactionCheckListener监听器(已不推荐使用)TransactionCheckListenertransactionCheckListenerDefaultMQProducerImpl。this。checkListener();获取事务监听器TransactionListenertransactionListenergetCheckListener();如果其中之一不为空if(transactionCheckListener!nulltransactionListener!null){初始化为UNKNOW状态LocalTransactionStatelocalTransactionStateLocalTransactionState。UNKNOW;Ttry{if(transactionCheckListener!null){localTransactionStatetransactionCheckListener。checkLocalTransactionState(message);}elseif(transactionListener!null){log。debug(UsednewcheckAPIintransactionmessage);调用checkLocalTransaction回查状态localTransactionStatetransactionListener。checkLocalTransaction(message);}else{log。warn(CheckTransactionState,picktransactionListenerbygroup〔{}〕failed,group);}}catch(Throwablee){log。error(BrokercallcheckTransactionState,butcheckLocalTransactionStateexception,e);}处理事务状态this。processTransactionState(localTransactionState,group,exception);}else{log。warn(CheckTransactionState,picktransactionCheckListenerbygroup〔{}〕failed,group);}}处理事务状态privatevoidprocessTransactionState(finalLocalTransactionStatelocalTransactionState,finalStringproducerGroup,finalThrowableexception){构建结束事务的请求头finalEndTransactionRequestHeaderthisHeadernewEndTransactionRequestHeader();设置tCommitLog的偏移量thisHeader。setCommitLogOffset(checkRequestHeader。getCommitLogOffset());thisHeader。setProducerGroup(producerGroup);设置生产者组thisHeader。setTranStateTableOffset(checkRequestHeader。getTranStateTableOffset());thisHeader。setFromTransactionCheck(true);设置状态检查为true。。。thisHeader。setTransactionId(checkRequestHeader。getTransactionId());设置事务IDswitch(localTransactionState){caseCOMMITMESSAGE:thisHeader。setCommitOrRollback(MessageSysFlag。TRANSACTIONCOMMITTYPE);设置为提交caseROLLBACKMESSAGE:thisHeader。setCommitOrRollback(MessageSysFlag。TRANSACTIONROLLBACKTYPE);设置为回滚log。warn(whenbrokercheck,clientrollbackthistransaction,{},thisHeader);caseUNKNOW:thisHeader。setCommitOrRollback(MessageSysFlag。TRANSACTIONNOTTYPE);设置为未知log。warn(whenbrokercheck,clientdoesnotknowthistransactionstate,{},thisHeader);default:}。。。执行结束事务钩子函数doExecuteEndTransactionHook(msg,uniqueKey,brokerAddr,localTransactionState,true);try{向Broker发送消息的回查结果DefaultMQProducerImpl。this。mQClientFactory。getMQClientAPIImpl()。endTransactionOneway(brokerAddr,thisHeader,remark,3000);}catch(Exceptione){log。error(endTransactionOnewayexception,e);}}};提交到线程池中执行任务this。checkExecutor。submit(request);}} 总结