文章目录
- Seata全局事务源码
- Seata AT模式的设计思路
- 源码入口
- TransactionalTemplate
- 开启全局事务
- TM开启全局事务
- TC处理TM的请求
- 全局事务提交
- 微服务端TM发送请求
- TC处理TM的请求
- RM处理TC的请求
- 全局事务回滚
- TM发送请求
- TC处理TM的请求
- RM处理TC的请求
- 补充知识
- 微服务怎么找TC服务
Seata全局事务源码
Seata源码版本为1.7.0
源码下载路径
Seata AT模式的设计思路
一阶段
这是分支事务主要的执行流程,不在本章节的源码分析范围内
业务数据和志记录在同回滚日一个本地事务中提交,释放本地锁和连接资源。核心在于对业务sql进行解析,转换成undolog,并同时入库
二阶段
分布式事务操作成功,则TC通知RM异步删除undolog
分布式事务操作失败,TM向TC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。
Seata生命周期图
总结核心点:
-
Seata是通过AOP
wrapIfNecessary() --> MethodInterceptor ---> GlobalTransactionalInterceptor --> TransactionalTemplate
-
全局事务相关的内容都是TM驱动的,它会向TC发送开始全局事务的请求并得到XID,整个业务方法执行,它会向TC发送全局提交/全局回滚的请求
-
TC再去调用RM对外提供的接口
AbstractRMHandler#handle
进行全局事务的提交/回滚 -
RM去操作undo_log数据表
源码入口
SeataClient启动流程 在线流程图
微服务使用Seata基本过程
引入依赖
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置文件
远程调用的方法上,使用@GlobalTransactional
注解开启全局事务,其他下游服务仅仅使用@Transactional
注解即可
@GlobalTransactional(name="createOrder",rollbackFor=Exception.class)
public Order saveOrder(OrderVo orderVo) {......storageFeignService.deduct(..);accountFeignService.debit(..);
}
通过Maven依赖,找sprin.facotries文件,再找自动配置类,进而找到核心Bean
通过SeataAutoConfiguration
自动配置类,往Spring容器中添加GlobalTransactionScanner
这个bean对象
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataAutoConfiguration {private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);@Bean(BEAN_NAME_FAILURE_HANDLER)@ConditionalOnMissingBean(FailureHandler.class)public FailureHandler failureHandler() {// 全局事务回滚失败,那么就会调用这个类的方法,只是打印日志,我们可以自定义FailureHandler类进行扩展return new DefaultFailureHandlerImpl();}// 创建GlobalTransactionScanner对象@Bean@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})@ConditionalOnMissingBean(GlobalTransactionScanner.class)public static GlobalTransactionScanner globalTransactionScanner(...) {......return new GlobalTransactionScanner(...);}
}
接下来看看GlobalTransactionScanner
类的详情
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {......
}
从它的父类AbstractAutoProxyCreator
可知,它一定是和代理对象 AOP有关,再去查看核心方法GlobalTransactionScanner#wrapIfNecessary
从它的接口InitializingBean
可知,它还有初始化相关的方法要执行GlobalTransactionScanner#afterPropertiesSet
我们先看看它的初始化方法:
- 调用initClient()方法
- 调用初始化TM和RM的init()方法
// 该方法只是会调用initClient()方法
public void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)this);return;}if (initialized.compareAndSet(false, true)) {// 会调用该方法initClient();}
}private void initClient() {if (LOGGER.isInfoEnabled()) {LOGGER.info("Initializing Global Transaction Clients ... ");}if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {LOGGER.warn(...);}if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(...);}//init TM// 初始化TM事务管理器TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);//init RM// 初始化RM 资源管理器RMClient.init(applicationId, txServiceGroup);registerSpringShutdownHook();}
进入初始化TM和RM的init()方法后就能看到,他们是基于netty实现的,那么他们肯定就会有一些InboundHandler
来处理请求。就拿RM举例
public class RMClient {public static void init(String applicationId, String transactionServiceGroup) {RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());// 这里的DefaultRMHandler,获取到的就是RMInboundHandler接口的实现类// 它会去处理BranchCommitRequest、BranchRollbackRequest、UndoLogDeleteRequest请求rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());rmNettyRemotingClient.init();}
}// 一般处理 TC发送过来的分支提交/分支回滚的请求都是该接口的实现类:AbstractRMHandler类进行处理的
public interface RMInboundHandler {BranchCommitResponse handle(BranchCommitRequest request);BranchRollbackResponse handle(BranchRollbackRequest request);void handle(UndoLogDeleteRequest request);
}
再看看GlobalTransactionScanner#wrapIfNecessary
的核心方法:
- TCC模式就创建TccActionInterceptor拦截器,其他模式就创建GlobalTransactionalInterceptor拦截器
- 进入GlobalTransactionalInterceptor拦截器的invoke()方法中,获取@GlobalTransactional注解对象,会调用handleGlobalTransaction()方法
- 从GlobalTransactionalInterceptor#handleGlobalTransaction进入到TransactionalTemplate#execute方法中
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {...try {synchronized (PROXYED_SET) {...interceptor = null;// 如果是TCC模式,那么就创建new TccActionInterceptor(..)拦截器if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);// 创建TccActionInterceptor拦截器interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);} else {......if (globalTransactionalInterceptor == null) {// AT模式、XA模式、Saga模式就创建GlobalTransactionalInterceptor拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}......}}
}// 进入到 GlobalTransactionalInterceptor 拦截器的invoke方法中
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {Class<?> targetClass =methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);// 获取@GlobalTransactional注解对象final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);if (!localDisable) {if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {AspectTransactional transactional;if (globalTransactionalAnnotation != null) {// 通过@GlobalTransactional注解对象 为 transactional 对象赋值transactional = new AspectTransactional(...);} else {transactional = this.aspectTransactional;}// 调用该方法return handleGlobalTransaction(methodInvocation, transactional);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);}}}return methodInvocation.proceed();
}// 进入到transactionalTemplate.execute()方法中
Object handleGlobalTransaction(final MethodInvocation methodInvocation,final AspectTransactional aspectTransactional) throws Throwable {boolean succeed = true;try {// 进入到transactionalTemplate.execute()方法中return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {return methodInvocation.proceed();}public String name() {...}@Overridepublic TransactionInfo getTransactionInfo() {...}});} catch (TransactionalExecutor.ExecutionException e) {...} finally {if (ATOMIC_DEGRADE_CHECK.get()) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}
}
我们也可以直接在@GlobalTransactional
注解这个上面的注释中直接跳转到下面这两个关键方法中去
- GlobalTransactionScanner#wrapIfNecessary 这里就是整个GlobalTransaction注解AOP的入口
- GlobalTransactionalInterceptor#handleGlobalTransaction 这里会去调用到transactionalTemplate#execute
TransactionalTemplate
接下来就进入到了事务模板方法中了
io.seata.tm.api.TransactionalTemplate.execute()
接下来全局事务相关的都是从这个事务模板方法作为入口
public Object execute(TransactionalExecutor business) throws Throwable {TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}GlobalTransaction tx = GlobalTransactionContext.getCurrent();Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {// ......// 传播机制相关的处理}GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Participant) {LOGGER.info("join into a existing global transaction,xid={}", tx.getXid());}try {// 开启全局事务beginTransaction(txInfo, tx);Object rs;try {// 执行业务方法rs = business.execute();} catch (Throwable ex) {// 3. 全局事务回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. 全局事务提交commitTransaction(tx, txInfo);return rs;} finally {//5. clearresumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}
}
开启全局事务
开启全局事务流程 在线流程图
TM开启全局事务
TransactionalTemplate#beginTransaction
方法的具体实现:
- 调用
DefaultGlobalTransaction#begin
方法 - 判断当前角色是否为全局事务发起者,如果不是就不进行之后的流程了
- 当前 xid != null 就抛异常
- 通过TM向TC发送开启全局事务的请求,并获取到XID。
DefaultTransactionManager#begin
发送GlobalBeginRequest
请求 - 调整全局事务状态为Begin
- 将全局事务XID保存至RootContext上下文对象中
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {triggerBeforeBegin();// 进入该方法tx.begin(txInfo.getTimeOut(), txInfo.getName());triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}
}// DefaultGlobalTransaction#begin
public void begin(int timeout, String name) throws TransactionException {this.createTime = System.currentTimeMillis();// 判断当前角色是否为全局事务发起者,如果不是就不进行之后的流程了if (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}// 如果当前成员属性 xid != null 就抛异常assertXIDNull();// 正常流程 RootContext中目前是没有XID的String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}// 通过TM向TC发送开启全局事务的请求,并获取到XIDxid = transactionManager.begin(null, null, name, timeout);// 调整全局事务状态为Beginstatus = GlobalStatus.Begin;// 将全局事务XID保存至RootContext上下文对象中RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}
}// DefaultTransactionManager#begin
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 封装请求参数对象GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);// 调用TC,开启全局事务GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}// 返回XIDreturn response.getXid();
}
服务调用方的web拦截器中就会把XID从RootContext中取出来,存入请求头中,比如SeataFeignRequestInterceptor
拦截器
下游服务就会在拦截器中把XID从请求头中取出来,存入RootContext对象中,比如SeataHandlerInterceptor
拦截器
TC处理TM的请求
TC处理TM发送过来的开启全局事务GlobalBeginRequest请求,在这个过程中,TC需要做的事:
- 开启全局事务,构建一个GlobalSession对象 生成xid,这个对象就是全局事务global_table 数据表对应的实体对象
- 存储全局事务信息,如果选择DB模式那么就往global_table数据表中插入数据
- 返回XID
Seata Server端也就是TC这里该如何去找源码的入口嘞,我们知道seata是基于netty实现的,那么也就会有相应的入栈出栈handler,就比如RM就有一个RMInboundHandler
接口来处理TC传过来的分支事务提交/回滚请求。那么我们也就应该能找到TCInboundHandler
顶级接口
public interface TCInboundHandler {/*** Handle global begin response.*/GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);/*** Handle global commit response.*/GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);/*** Handle global rollback response.*/GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);
}
最终处理请求的入口为DefaultCoordinator#doGlobalBegin
,整个开启全局事务的流程为:
- 构建一个GlobalSession对象,这个过程中会生成一个全局事务XID
- 调用GlobalSession对象的begin()方法,修改全局事务状态为begin、全局事务开始 开始时间
- 再经过一段方法调用,往DB的global_table数据表中插入一条全局事务数据。
// DefaultCoordinator#doGlobalBegin
// 方法调用,并为response响应对象设置xid
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {// 调用begin()方法response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info(...);}
}// DefaultCore#begin
// 构建一个GlobalSession对象,这个过程中会生成一个全局事务XID;再调用session.begin()方法
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 构建一个GlobalSession对象,这个过程中会生成一个全局事务XIDGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 调用begin()方法session.begin();MetricsPublisher.postSessionDoingEvent(session, false);return session.getXid();
}// GlobalSession#begin
@Override
public void begin() throws TransactionException {// 修改全局事务状态为beginthis.status = GlobalStatus.Begin;// 全局事务开始 开始时间this.beginTime = System.currentTimeMillis();// 这个属性值为true,在进行全局事务提交或回滚时会改为false,这样其他分支事务就不能继续注册了this.active = true;for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {// 进入onBegin()方法lifecycleListener.onBegin(this);}
}// AbstractSessionManager#onBegin
@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {// 进入该方法// 各个子类会重写下面的方法 DB 、 file 、 redis 相关的实现类会重写addGlobalSession()方法addGlobalSession(globalSession);
}// DataBaseSessionManager#addGlobalSession
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {// 调用writeSession()方法进行往global_table数据表中插入数据boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}} else {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}}
}// DataBaseTransactionStoreManager#writeSession
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {// 往DB global_table 数据表中插入数据// 调用至LogStoreDataBaseDAO#insertGlobalTransactionDO,操作数据库进行insert// 它支持Mysql、Oracle、PostgreSqlreturn logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else {throw new StoreException("Unknown LogOperation:" + logOperation.name());}
}
全局事务提交
全局事务提交 在线流程图
微服务端TM发送请求
当整个全局事务的业务方法都执行完成,没有出现异常那么就需要进行全局事务的提交了,TM会向TC发送一个全局事务提交的请求,当然发送全局事务提交的请求是默认有5次重试机制的
入口方法为TransactionalTemplate#commitTransaction
,具体实现:
- 超时相关的判断;如果超时则进入到全局事务回滚的流程;再调用至
DefaultGlobalTransaction#commit
方法 - 当前需要是全局事务发起者角色才能进行全局事务的提交,如果是participate那么就不能进行全局事务的提交
- XID必须存在,都进入到了全局事务提交的流程了,如果XID为null那么就抛异常
- 重试机制,默认重试5次
- 重试次数–
- TM向TC发送全局事务提交的请求,
transactionManager.commit(xid)
- 进入到
DefaultTransactionManager#commit
方法,向RM发送GlobalCommitRequest
请求
// TransactionalTemplate#commitTransaction
private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo) throws ... {// 超时机制的判断if (isTimeout(tx.getCreateTime(), txInfo)) {Exception exx = new TmTransactionException(...);// 全局事务回滚流程rollbackTransaction(tx, exx);return;}try {triggerBeforeCommit();// 进入该方法 DefaultGlobalTransaction#committx.commit();GlobalStatus afterCommitStatus = tx.getLocalStatus();TransactionalExecutor.Code code = TransactionalExecutor.Code.Unknown;...} catch (TransactionException txe) {...}
}// DefaultGlobalTransaction#commit
@Override
public void commit() throws TransactionException {// 当前需要是全局事务发起者角色才能进行全局事务的提交,如果是participate那么就不能进行全局事务的提交if (role == GlobalTransactionRole.Participant) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}// XID必须存在,如果为null那么就抛异常assertXIDNotNull();// 重试机制,默认重试5次int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {while (retry > 0) {try {// 重试次数--retry--;// TM向TC发送全局事务提交的请求, 调用至DefaultTransactionManager#commitstatus = transactionManager.commit(xid);break;} catch (Throwable ex) {...}}} finally {if (xid.equals(RootContext.getXID())) {suspend(true);}}
}// DefaultTransactionManager#commit
@Override
public GlobalStatus commit(String xid) throws TransactionException {// 封装GlobalCommitRequest请求对象GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);// 向TC发送请求GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);return response.getGlobalStatus();
}
这里,TM向TC发送了GlobalCommitRequest
请求
TC处理TM的请求
TC处理TM发送过来的全局事务提交GlobalCommitRequest
请求,在这个过程中,TC需要做的事为:
- 删除全局锁 lock_table 数据表中的数据
- 调用各个分支事务,向它们发送分支事务提交
BranchCommitRequest
请求,驱动它们删除 undo_log 数据表中的数据 - 删除分支事务 branch_table 数据表中的数据
- 删除全局事务 global_table 数据表中的数据
入口方法为DefaultCoordinator#doGlobalCommit
。整个请求处理的流程为:
-
调用到
DefaultCore#commit(xid)
方法,通过xid获取到GlobalSession对象 -
全局事务提交时,判断是否超时
-
调用
globalSession.closeAndClean()
方法,关闭会话、删除全局锁。close()
关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为falseclean()
根据xid 删除全局锁 lock_table 数据表中的数据 -
调用
globalSession.asyncCommit()
方法,进入全局事务 异步提交逻辑。修改全局事务 global_table 数据表中的状态status字段的值 -
处理异步提交,
DefaultCoordinator
类的init()
方法中定时任务调用handleAsyncCommitting()
方法,再调用DefaultCore#doGlobalCommit
-
循环处理全局事务对应的所有分支事务
branchCommit()
向RM发送请求,进行分支提交。得到分支提交后的状态- 正常情况下,分支事务提交后的状态为PhaseTwo_Committed,然后调用
removeBranch()
方法 删除 分支事务 branch_table 数据表中的数据
-
将全局事务状态改为 Committed
-
调用
SessionHelper.endCommitted()
方法 —>globalSession.end()
根据xid 删除全局锁 lock_table 数据表、全局事务 global_table 数据表中的数据
具体源码:
// DefaultCoordinator#doGlobalCommit
// 会调用DefaultCore#commit
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());// 调用core.commit()方法response.setGlobalStatus(core.commit(request.getXid()));
}// DefaultCore#commit
// 全局事务提交时,判断是否超时、关闭会话、根据xid删除全局锁lock_table数据表中的数据、全局事务 异步提交逻辑
@Override
public GlobalStatus commit(String xid) throws TransactionException {// 通过xid获取到GlobalSession对象GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}// 全局事务提交时,判断是否超时if (globalSession.isTimeout()) {LOGGER.info("TC detected timeout, xid = {}", globalSession.getXid());return GlobalStatus.TimeoutRollbacking;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {// 正常情况下,globalSession.getStatus()的值为begin状态if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.// close()关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为false// clean() 根据xid 删除全局锁 lock_table 数据表中的数据globalSession.closeAndClean();if (globalSession.canBeCommittedAsync()) {// 全局事务 异步提交逻辑globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {globalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});// ......
}
看看closeAndClean方法的处理逻辑
- close()关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为false
- clean() 根据xid 删除全局锁 lock_table 数据表中的数据
// GlobalSession#closeAndClean
public void closeAndClean() throws TransactionException {// 把globalSession的action属性值设置为falseclose();if (this.hasATBranch()) {// 根据xid 删除全局锁 lock_table 数据表中的数据clean();}
}public void clean() throws TransactionException {// 根据xid 删除全局锁 lock_table 数据表中的数据if (!LockerManagerFactory.getLockManager().releaseGlobalSessionLock(this)) {throw new TransactionException("UnLock globalSession error, xid = " + this.xid);}
}// DataBaseLockManager#releaseGlobalSessionLock
@Override
public boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException {try {// 根据xid 删除全局锁 lock_table 数据表中的数据return getLocker().releaseLock(globalSession.getXid());} catch (Exception t) {LOGGER.error("unLock globalSession error, xid:{}", globalSession.getXid(), t);return false;}
}// DataBaseLocker#releaseLock
@Override
public boolean releaseLock(String xid) {try {// 根据xid 删除全局锁 lock_table 数据表中的数据return lockStore.unLock(xid);} catch (StoreException e) {throw e;} catch (Exception t) {LOGGER.error("unLock by branchIds error, xid {}", xid, t);return false;}
}@Override
public boolean unLock(String xid) {Connection conn = null;PreparedStatement ps = null;try {conn = lockStoreDataSource.getConnection();conn.setAutoCommit(true);//batch release lock by branch list// 根据xid 删除全局锁 lock_table 数据表中的数据String batchDeleteSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchDeleteLockSqlByXid(lockTable);ps = conn.prepareStatement(batchDeleteSQL);ps.setString(1, xid);ps.executeUpdate();} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}return true;
}
接下来是处理全局事务异步提交的逻辑
在DefaultCoordinator
类的init()
会有一个定时任务,循环调用全局事务异步提交的处理。
而真正处理异步提交的是DefaultCoordinator#handleAsyncCommitting
public void init() {retryRollbacking.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);retryCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);// 异步提交的任务,每秒执行 handleAsyncCommitting()方法asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);timeoutCheck.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
DefaultCoordinator#handleAsyncCommitting
具体源码:
- 向RM发送请求,进行分支提交
- 删除 分支事务 branch_table 数据表中的数据
- 删除全局锁 lock_table 数据表中的数据、 删除全局事务 global_table 数据表中的数据
protected void handleAsyncCommitting() {// 异步提交的方法// DefaultCoordinator类的init()方法中定时任务调用该方法SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);Collection<GlobalSession> asyncCommittingSessions =SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);if (CollectionUtils.isEmpty(asyncCommittingSessions)) {return;}SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {try {asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 异步提交,进入该方法core.doGlobalCommit(asyncCommittingSession, true);} catch (TransactionException ex) {LOGGER.error(..);}});
}// DefaultCore#doGlobalCommit
@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start committing eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {// saga流程success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {// 循环处理全局事务对应的所有分支事务Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {if (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// branchCommit()向RM发送请求,进行分支提交。得到分支提交后的状态BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);// 分支事务状态的判断if (isXaerNotaTimeout(globalSession,branchStatus)) {LOGGER.info(...);branchStatus = BranchStatus.PhaseTwo_Committed;}switch (branchStatus) {case PhaseTwo_Committed:// 调用removeBranch()方法// 删除 分支事务 branch_table 数据表中的数据SessionHelper.removeBranch(globalSession, branchSession, !retrying);LOGGER.info(...));return CONTINUE;case PhaseTwo_CommitFailed_Unretryable://not at branchSessionHelper.endCommitFailed(globalSession, retrying);LOGGER.error(...);return false;default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error(...);return CONTINUE;} else {LOGGER.error(...);return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});if (result != null) {return result;}if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}}if (success && globalSession.getBranchSessions().isEmpty()) {if (!retrying) {// 将全局事务状态改为 CommittedglobalSession.setStatus(GlobalStatus.Committed);}// 进入该方法// 根据xid 删除全局锁 lock_table 数据表中的数据、 删除全局事务 global_table 数据表中的数据SessionHelper.endCommitted(globalSession, retrying);LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());}return success;
}// AbstractCore#AbstractCore
// Tc调用RM 进行分支事务提交 发送BranchCommitRequest请求
@Override
public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {// 封装BranchCommitRequest请求对象BranchCommitRequest request = new BranchCommitRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());// 发送RM,进行分支事务提交return branchCommitSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(...);}
}// SessionHelper#removeBranch
public static void removeBranch(GlobalSession globalSession, BranchSession branchSession, boolean isAsync)throws TransactionException {// 根据 branch_id 再进行一次 删除全局锁 lock_table 数据表中的数据globalSession.unlockBranch(branchSession);if (isEnableBranchRemoveAsync() && isAsync) {COORDINATOR.doBranchRemoveAsync(globalSession, branchSession);} else {// 删除 分支事务 branch_table 数据表中的数据globalSession.removeBranch(branchSession);}
}// GlobalSession#removeBranch
@Override
public void removeBranch(BranchSession branchSession) throws TransactionException {for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {// 进入该方法// 删除 分支事务 branch_table 数据表中的数据lifecycleListener.onRemoveBranch(this, branchSession);}remove(branchSession);
}// AbstractSessionManager#onRemoveBranch
@Override
public void onRemoveBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {// 进入该方法// 各个子类会重写下面的方法 DB 、 file 、 redis 相关的实现类会重写removeBranchSession()方法// 删除 分支事务 branch_table 数据表中的数据removeBranchSession(globalSession, branchSession);
}// DataBaseSessionManager#removeBranchSession
@Override
public void removeBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException {if (StringUtils.isNotBlank(taskName)) {return;}// 删除 分支事务 branch_table 数据表中的数据// 再往后就是执行sql相关的代码了boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_REMOVE, session);if (!ret) {throw new StoreException("removeBranchSession failed.");}
}// 分支事务删除后,便是删除全局事务表中的数据了
// SessionHelper#endCommitted
public static void endCommitted(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {if (retryGlobal || !DELAY_HANDLE_SESSION) {long beginTime = System.currentTimeMillis();boolean retryBranch = globalSession.getStatus() == GlobalStatus.CommitRetrying;globalSession.changeGlobalStatus(GlobalStatus.Committed);// 进入该方法// 根据xid 删除全局锁 lock_table 数据表中的数据、 删除全局事务 global_table 数据表中的数据globalSession.end();if (!DELAY_HANDLE_SESSION) {MetricsPublisher.postSessionDoneEvent(globalSession, false, false);}MetricsPublisher.postSessionDoneEvent(globalSession, IdConstants.STATUS_VALUE_AFTER_COMMITTED_KEY, true,beginTime, retryBranch);} else {if (globalSession.isSaga()) {globalSession.setStatus(GlobalStatus.Committed);globalSession.end();}MetricsPublisher.postSessionDoneEvent(globalSession, false, false);}
}// GlobalSession#end
@Override
public void end() throws TransactionException {if (GlobalStatus.isTwoPhaseSuccess(status)) {// Clean locks first// 再根据xid 删除全局锁 lock_table 数据表中的数据clean();for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {// 进入该方法// 根据xid 删除全局事务 global_table 数据表中的数据lifecycleListener.onSuccessEnd(this);}} else {for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onFailEnd(this);}}
}// AbstractSessionManager#onSuccessEnd
@Override
public void onSuccessEnd(GlobalSession globalSession) throws TransactionException {// 进入该方法// 各个子类会重写下面的方法 DB 、 file 、 redis 相关的实现类会重写removeBranchSession()方法// 根据xid 删除全局事务 global_table 数据表中的数据removeGlobalSession(globalSession);
}
RM处理TC的请求
RM处理TC发送过来的分支事务提交BranchCommitRequest
请求,在这个过程中,RM需要做的事:
- 删除undo_log表数据
处理分支事务提交请求的入口为AbstractRMHandler.handle(BranchCommitRequest request)
。总流程为:
-
handler()
方法调用doBranchCommit()
方法,该方法再调用至资源管理器DataSourceManager#branchCommit
-
异步分支事务提交 进入到
AsyncWorker@branchCommit
,将xid+branchId+resourceId封装为Phase2Context
对象,添加进commitQueue
中 -
AsyncWorker@doBranchCommit
处理队列中的Phase2Context
对象,获取数据库连接对象、UndoLogManager。 -
通过调用
undoLogManager.batchDeleteUndoLog()
批量删除undo_log
// AbstractRMHandler#handle
// 主要是调用doBranchCommit()方法
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {// 分支事务提交的响应对象BranchCommitResponse response = new BranchCommitResponse();// 进入到doBranchCommit方法中exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {@Overridepublic void execute(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {doBranchCommit(request, response);}}, request, response);return response;
}// AbstractRMHandler#doBranchCommit
// 先获取请求参数,然后调用资源管理器的方法,以AT模式举例 这里会进入到DataSourceManager#branchCommit
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {// 获取请求参数String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);}// 调用资源管理器进行分支事务提交,AT模式进入到DataSourceManager#branchCommitBranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch commit result: " + status);}}// DataSourceManager#branchCommit
// 调用AsyncWorker#branchCommit方法,异步处理分支提交请求
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// 异步分支事务提交 进入到AsyncWorker#branchCommit// 返回BranchStatus.PhaseTwo_Committed 两阶段提交的状态return asyncWorker.branchCommit(xid, branchId, resourceId);
}
// AsyncWorker#branchCommit
// 调用AsyncWorker#addToCommitQueue方法,将请求参数封装为Phase2Context对象并进行入队操作
public BranchStatus branchCommit(String xid, long branchId, String resourceId) {Phase2Context context = new Phase2Context(xid, branchId, resourceId);// 添加进提交队列中addToCommitQueue(context);return BranchStatus.PhaseTwo_Committed;
}
// AsyncWorker#addToCommitQueue
// Phase2Context对象入队操作,如果队列满了那么就调用doBranchCommitSafely()方法去处理队列中的Phase2Context对象
private void addToCommitQueue(Phase2Context context) {// 入队,offer()方法,如果队列满了不抛异常,返回false,然后下面再去处理这个提交// 异步处理提交的方法为doBranchCommit()方法if (commitQueue.offer(context)) {return;}// 去调用从队列中取任务的方法,处理提交事务的任务,再把当前提交继续往commitQueue队列中添加CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor).thenRun(() -> addToCommitQueue(context));
}// 分支事务异步处理,入队操作完成后,接下来就是从队列去取对象进行处理了,通过undoLogManager对象进行删除undolog
// AsyncWorker#doBranchCommit
private void doBranchCommit() {// 提交事务队列为空就直接返回if (commitQueue.isEmpty()) {return;}// 取出当前队列中所有的二阶段提交上下文对象List<Phase2Context> allContexts = new LinkedList<>();commitQueue.drainTo(allContexts);// 进行分组Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);// 循环调用dealWithGroupedContexts(..)方法groupedContexts.forEach(this::dealWithGroupedContexts);
}
// 上面方法调用该方法
private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {if (StringUtils.isBlank(resourceId)) {LOGGER.warn("resourceId is empty and will skip.");return;}DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);if (dataSourceProxy == null) {LOGGER.warn("failed to find resource for {} and requeue", resourceId);addAllToCommitQueue(contexts);return;}Connection conn = null;try {// 获取数据库连接对象conn = dataSourceProxy.getPlainConnection();// 获取UndoLogManager undolog管理器对象UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());// 将上下文拆分为几个列表,每个列表包含的元素不超过1000限制大小List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);for (List<Phase2Context> partition : splitByLimit) {// 进行删除undo_log数据表中的数据deleteUndoLog(conn, undoLogManager, partition);}} catch (SQLException sqlExx) {addAllToCommitQueue(contexts);LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, sqlExx);} finally {IOUtil.close(conn);}}
// 通过UndoLogManager对象,调用batchDeleteUndoLog(xids, branchIds, conn)删除undo_log数据
private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {Set<String> xids = new LinkedHashSet<>(contexts.size());Set<Long> branchIds = new LinkedHashSet<>(contexts.size());contexts.forEach(context -> {xids.add(context.xid);branchIds.add(context.branchId);});try {// 通过UndoLogManager进行批量删除undo_logundoLogManager.batchDeleteUndoLog(xids, branchIds, conn);// 提交if (!conn.getAutoCommit()) {conn.commit();}} catch (SQLException e) {LOGGER.error("Failed to batch delete undo log", e);try {conn.rollback();addAllToCommitQueue(contexts);} catch (SQLException rollbackEx) {LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);}}
}
全局事务回滚
全局事务回滚在线流程图
TM发送请求
当整个全局事务的业务方法执行过程中出现异常那么就需要进行全局事务的回滚了,TM会向TC发送一个全局事务回滚的请求,当然请求是默认有5次重试机制的
入口方法为TransactionalTemplate#completeTransactionAfterThrowing
,具体实现:
- 校验异常,判断当前的异常是否需要回滚;如果异常校验不通过则进入全局事务提交的流程;再调用至
DefaultGlobalTransaction#rollback
方法 - 当前需要是全局事务发起者角色才能进行全局事务的回滚,如果是participate那么就不能进行全局事务的回滚
- XID必须存在,都进入到了全局事务回滚的流程了,如果XID为null那么就抛异常
- 重试机制,默认重试5次
- 重试次数–
- TM向TC发送全局事务提交的请求,
transactionManager.rollback(xid)
- 进入到
DefaultTransactionManager#rollback
方法,向RM发送GlobalRollbackRequest
请求
// 异常的校验,判断是否需要进行全局事务回滚,在调用至rollbackTransaction()方法
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)throws TransactionalExecutor.ExecutionException, TransactionException {// 校验异常,判断当前的异常是否需要回滚if (txInfo != null && txInfo.rollbackOn(originalException)) {// 全局事务回滚rollbackTransaction(tx, originalException);} else {// 全局事务提交commitTransaction(tx, txInfo);}
}// 该方法主要是调用至DefaultGlobalTransaction#rollback方法
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {try {triggerBeforeRollback();// 进入该方法 调用至DefaultGlobalTransaction#rollback方法tx.rollback();triggerAfterRollback();} catch (TransactionException txe) {// Failed to rollbackthrow new TransactionalExecutor.ExecutionException(...);}// ......}// DefaultGlobalTransaction#rollback方法
@Override
public void rollback() throws TransactionException {// 事务发起者角色校验,如果是participate则直接返回if (role == GlobalTransactionRole.Participant) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);}return;}// XID必须存在,如果为null那么就抛异常assertXIDNotNull();if (LOGGER.isInfoEnabled()) {LOGGER.info("transaction {} will be rollback", xid);}// 重试机制,默认重试5次int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;try {while (retry > 0) {try {// 重试次数--retry--;// TM向TC发送全局事务回滚的请求,调用至DefaultTransactionManager#rollbackstatus = transactionManager.rollback(xid);break;} catch (Throwable ex) {...}}} finally {if (xid.equals(RootContext.getXID())) {suspend(true);}}
}// DefaultTransactionManager#rollback
@Override
public GlobalStatus rollback(String xid) throws TransactionException {// 封装GlobalRollbackRequest请求对象GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);// 向TC发送请求GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);return response.getGlobalStatus();
}
TC处理TM的请求
TC处理TM发送过来的全局事务回滚GlobalRollbackRequest
请求,在这个过程中,TC需要做的事为:
- 删除全局锁 lock_table 数据表中的数据
- 调用各个分支事务,向它们发送分支事务回滚
BranchRollbackRequest
请求,驱动它们进行数据补偿,并删除 undo_log 数据表中的数据 - 删除分支事务 branch_table 数据表中的数据
- 删除全局事务 global_table 数据表中的数据
入口方法为DefaultCoordinator#doGlobalRollback
。整个请求处理的流程为:
-
调用到
DefaultCore#rollback(xid)
方法,通过xid获取到GlobalSession对象 -
globalSession.close()
关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为false -
改变全局事务状态为Rollbacking,再进入到
doGlobalRollback()
方法 -
遍历当前全局事务所对应的分支事务
-
如果当前分支事务是一阶段失败状态PhaseOne_Failed,那么就调用
SessionHelper.removeBranch()
删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据因为这个分支事务在执行自己的业务sql时就抛异常了,所以它的undo_log表中都还没有插入数据
-
branchRollback()
TC 调用 RM 通知分支事务进行分支事务回滚 -
回滚成功情况,调用
SessionHelper.removeBranch()
删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据 -
回滚失败情况,修改全局事务状态, 再调用
globalSession.end()
方法 默认实现 打印错误日志信息
-
-
进入到
SessionHelper.endRollbacked()
--> 修改全局事务状态 Rollbacked -
再调用
globalSession.end()
方法,根据xid 删除全局锁 lock_table 数据表、全局事务 global_table 数据表中的数据
具体源码实现:
// DefaultCoordinator#doGlobalRollback
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());// 调用core.rollback()response.setGlobalStatus(core.rollback(request.getXid()));
}// DefaultCore#rollback
@Override
public GlobalStatus rollback(String xid) throws TransactionException {// 通过xid获取到GlobalSession对象GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {// 关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为falseglobalSession.close(); // 改变全局事务状态为Rollbackingif (globalSession.getStatus() == GlobalStatus.Begin) {globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}// 进入到doGlobalRollback()方法boolean rollbackSuccess = doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}// DefaultCore#doGlobalRollback
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start rollback eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {// Saga模式的处理success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);} else {// 遍历当前全局事务所对应的分支事务Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();// 如果当前分支事务是一阶段失败状态if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {// 删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// 通知分支事务进行分支事务回滚BranchStatus branchStatus = branchRollback(globalSession, branchSession);// 修改分支事务状态if (isXaerNotaTimeout(globalSession, branchStatus)) {LOGGER.info(...);branchStatus = BranchStatus.PhaseTwo_Rollbacked;}switch (branchStatus) {case PhaseTwo_Rollbacked:// 进入removeBranch()方法// 删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据SessionHelper.removeBranch(globalSession, branchSession, !retrying);LOGGER.info(...);return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable:// 两阶段回滚失败的处理流程// 修改全局事务状态, 再调用globalSession.end()方法 默认实现 打印错误日志信息SessionHelper.endRollbackFailed(globalSession, retrying);LOGGER.error(...);// 这里就返回false了return false;default:LOGGER.error(...);if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {// ...throw new TransactionException(ex);}});// Return if the result is not nullif (result != null) {return result;}}// 在db模式下,可能会出现锁和分支数据残留问题。因此,这里的执行需要延迟,并且不能同步执行。if (success) {SessionHelper.endRollbacked(globalSession, retrying);LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());}return success;
}
上面调用方法的具体实现如下:
// SessionHelper#removeBranch
// 删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据
public static void removeBranch(GlobalSession globalSession, BranchSession branchSession, boolean isAsync)throws TransactionException {// 再进行一次 删除全局锁 lock_table 数据表中的数据globalSession.unlockBranch(branchSession);if (isEnableBranchRemoveAsync() && isAsync) {COORDINATOR.doBranchRemoveAsync(globalSession, branchSession);} else {// 删除 分支事务 branch_table 数据表中的数据globalSession.removeBranch(branchSession);}
}// DefaultCore#branchRollback
// 调用branchRollback()方法 TC 调用 RM 进行分支事务回滚
public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {// 进入branchRollback()方法 TC 调用 RM 进行分支事务回滚return getCore(branchSession.getBranchType()).branchRollback(globalSession, branchSession);
}
// AbstractCore#branchRollback
public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {// 封装分支事务回滚请求对象 BranchRollbackRequestBranchRollbackRequest request = new BranchRollbackRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());// TC 调用 RM 发送请求return branchRollbackSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(...);}
}// SessionHelper#endRollbacked
// 修改全局事务状态为 Rollbacked。
// 调用 end()方法,再根据xid 删除全局锁 lock_table 数据表、全局事务 global_table 数据表中的数据
public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {if (retryGlobal || !DELAY_HANDLE_SESSION) {long beginTime = System.currentTimeMillis();boolean timeoutDone = false;GlobalStatus currentStatus = globalSession.getStatus();if (currentStatus == GlobalStatus.TimeoutRollbacking) {MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.TimeoutRollbacked, false, false);timeoutDone = true;}boolean retryBranch =currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);} else {// 修改全局事务状态为 RollbackedglobalSession.changeGlobalStatus(GlobalStatus.Rollbacked);}// 调用 end()方法// 再根据xid 删除全局锁 lock_table 数据表、全局事务 global_table 数据表中的数据globalSession.end();if (!DELAY_HANDLE_SESSION && !timeoutDone) {MetricsPublisher.postSessionDoneEvent(globalSession, false, false);}MetricsPublisher.postSessionDoneEvent(globalSession, IdConstants.STATUS_VALUE_AFTER_ROLLBACKED_KEY, true,beginTime, retryBranch);} else {if (globalSession.isSaga()) {globalSession.setStatus(GlobalStatus.Rollbacked);globalSession.end();}MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Rollbacked, false, false);}
}
RM处理TC的请求
RM处理TC发送过来的分支事务回滚BranchCommitRequest
请求,在这个过程中,RM需要做的事:
- 查询undo_log表数据
- beforeimage 和 after image进行比较,如果相等就不需要进行数据补偿,如果不相等就继续判断
- after image和 current 当前数据比较,如果相等就去生成逆向sql进行数据补偿,如果不相等就继续判断
- before image和current当前数据比较,如果相等那么也就不需要进行数据补偿,如果也不相等那么就抛异常
- 删除undo_log表数据
处理分支事务回滚请求的入口为AbstractRMHandler.handle(BranchRollbackRequest request)
。总流程为:
-
AbstractRMHandler#handle
方法会调用至doBranchRollback(..)
方法 -
从request对象中获取请求参数,调用RM资源管理器的分支事务回滚方法,AT模式进入到
DataSourceManager#branchRollback
-
获取到
UndoLogManager
对象,并调用它的undo(..)
方法,调用至AbstractUndoLogManager#undo
-
执行查询语句,查询到branchId+xid对应的undo_log表数据;读取context字段值,保存着before image和after image
-
从after image中读取
List<SQLUndoLog>
集合,并遍历该集合,循环体中调用至AbstractUndoExecutor#executeOn
-
调用
AbstractUndoExecutor#dataValidationAndGoOn
判断是否需要执行undo sql进行数据恢复,也就是check after image流程。- beforeRecords 和 afterRecords进行比较,如果相等就不需要执行undo sql进行数据恢复
- 查询当前数据表中的数据
- 如果afterRecords和 currentRecords 当前数据比较,如果相等就去执行 undo sql进行数据补偿,如果不相等则继续判断
- beforeRecords和 currentRecords 当前数据比较,如果相等那么也就不需要进行数据恢复,否则抛SQLUndoDirtyException
-
经过上方的判断,如果没有return,则进入到这个流程:生成 undo sql、执行undo sql
-
最后删除undo log表数据
// AbstractRMHandler#handle 方法会调用至`doBranchRollback(..)`方法
@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {// 分支事务回滚的响应对象BranchRollbackResponse response = new BranchRollbackResponse();// 进入到doBranchRollback()方法中exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {@Overridepublic void execute(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {doBranchRollback(request, response);}}, request, response);return response;
}// 从request对象中获取请求参数,调用RM资源管理器的分支事务回滚方法,AT模式进入到`DataSourceManager#branchRollback`
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {// 获取请求参数String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);}// 调用RM资源管理器的分支事务回滚方法,AT模式进入到DataSourceManager#branchRollbackBranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacked result: " + status);}
}// DataSourceManager#branchRollback
// 获取到UndoLogManager对象,并调用它的undo(..)方法,调用至AbstractUndoLogManager#undo
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));}try {// 获取到UndoLogManager对象,并调用它的undo(..)方法,调用至AbstractUndoLogManager#undoUndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);if (LOGGER.isInfoEnabled()) {LOGGER.info("branch rollback success, xid:{}, branchId:{}", xid, branchId);}} catch (TransactionException te) {// ......}return BranchStatus.PhaseTwo_Rollbacked;}// AbstractUndoLogManager#undo
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {ConnectionProxy connectionProxy = null;Connection conn = null;ResultSet rs = null;PreparedStatement selectPST = null;boolean originalAutoCommit = true;for (; ; ) {try {connectionProxy = dataSourceProxy.getConnection();// 获取数据库连接对象conn = connectionProxy.getTargetConnection();if (originalAutoCommit = conn.getAutoCommit()) {conn.setAutoCommit(false);}// 通过查询sql语句,获得PreparedStatement对象selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);selectPST.setLong(1, branchId);selectPST.setString(2, xid);// 执行查询语句,得到查询数据rs = selectPST.executeQuery();boolean exists = false;while (rs.next()) {exists = true;int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);if (!canUndo(state)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);}return;}// undo_log数据表中的context字段值,保存着before image和after imageString contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);Map<String, String> context = parseContext(contextString);byte[] rollbackInfo = getRollbackInfo(rs);// 序列化String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);// 取undolog对象UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance(): UndoLogParserFactory.getInstance(serializer);BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);try {setCurrentSerializer(parser.getName());// 得到branchId与xid对应的那条undo_log数据表记录中的所有SQLUndoLog对象List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();if (sqlUndoLogs.size() > 1) {Collections.reverse(sqlUndoLogs);}// 遍历SQLUndoLog集合for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());sqlUndoLog.setTableMeta(tableMeta);AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);// 进入该方法,该方法内会进行check after image相关的判断undoExecutor.executeOn(connectionProxy);}} finally {// remove serializer nameremoveCurrentSerializer();}}if (exists) {// 删除undo_log表数据deleteUndoLog(xid, branchId, conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,State.GlobalFinished.name());}} else {insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,State.GlobalFinished.name());}}return;} catch (SQLIntegrityConstraintViolationException e) {//......} catch (Throwable e) {//......} finally {//......}}
}// AbstractUndoExecutor#executeOn
public void executeOn(ConnectionProxy connectionProxy) throws SQLException {Connection conn = connectionProxy.getTargetConnection();// 判断是否需要进行undoLog的执行,after image == current 才会走之后的处理流程进行数据恢复if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(connectionProxy)) {return;}PreparedStatement undoPST = null;try {// 生成undo Sql语句String undoSQL = buildUndoSQL();// 获取PreparedStatement对象undoPST = conn.prepareStatement(undoSQL);// sql参数的处理TableRecords undoRows = getUndoRows();for (Row undoRow : undoRows.getRows()) {ArrayList<Field> undoValues = new ArrayList<>();List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, connectionProxy.getDbType());for (Field field : undoRow.getFields()) {if (field.getKeyType() != KeyType.PRIMARY_KEY) {undoValues.add(field);}}undoPrepare(undoPST, undoValues, pkValueList);// 执行sqlundoPST.executeUpdate();}} catch (Exception ex) {if (ex instanceof SQLException) {throw (SQLException) ex;} else {throw new SQLException(ex);}}finally {//important for oracleIOUtil.close(undoPST);}}protected boolean dataValidationAndGoOn(ConnectionProxy conn) throws SQLException {TableRecords beforeRecords = sqlUndoLog.getBeforeImage();TableRecords afterRecords = sqlUndoLog.getAfterImage();// beforeRecords 和 afterRecords进行比较,如果相等就不需要执行undo sql进行数据恢复Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);if (beforeEqualsAfterResult.getResult()) {return false;}// 查询当前数据表中的数据TableRecords currentRecords = queryCurrentRecords(conn);// 如果afterRecords和 currentRecords 当前数据比较,如果相等就去执行 undo sql进行数据补偿,如果不相等则继续判断Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);if (!afterEqualsCurrentResult.getResult()) {// beforeRecords和 currentRecords 当前数据比较,如果相等那么也就不需要进行数据恢复,否则抛SQLUndoDirtyExceptionResult<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);if (beforeEqualsCurrentResult.getResult()) {return false;} else {//...throw new SQLUndoDirtyException("Has dirty records when undo.");}}// 老老实实去执行undo sql,进行数据恢复return true;
}
补充知识
微服务怎么找TC服务
源码入口在NacosRegistryServiceImpl.lookup(String key)
方法
public List<InetSocketAddress> lookup(String key) throws Exception {// 根据service.vgroupMapping.${seata.tx-service-group} 作为key去配置中心找value// 得到TC 这个微服务service下的clusterName// 在通过clusterName去服务注册中心找TCservice的instanceString clusterName = getServiceGroup(key);//......if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {synchronized (LOCK_OBJ) {if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {List<String> clusters = new ArrayList<>();clusters.add(clusterName);// 根据ServiceName、ServiceGroup、clusterName去注册中心找instanceList<Instance> firstAllInstances = getNamingInstance().getAllInstances(getServiceName(), getServiceGroup(), clusters);//......}}}return CLUSTER_ADDRESS_MAP.get(clusterName);
}
所以,第一步先获取${seata.tx-service-group}配置项的值
得到了service.vgroupMapping.default_tx_group
这么一个字符串,然后把这个作为key,去配置中心找值,这里找到的就是TC的clusterName
这里就得到了TC的集群名字,根据ServiceName、ServiceGroup、clusterName去注册中心找instance
最终找到TC service的instance