Seata源码分析 全局事务开启提交回滚流程

文章目录

  • Seata全局事务源码
    • Seata AT模式的设计思路
    • 源码入口
    • TransactionalTemplate
    • 开启全局事务
      • TM开启全局事务
      • TC处理TM的请求
    • 全局事务提交
      • 微服务端TM发送请求
      • TC处理TM的请求
      • RM处理TC的请求
    • 全局事务回滚
      • TM发送请求
      • TC处理TM的请求
      • RM处理TC的请求
    • 补充知识
      • 微服务怎么找TC服务

Seata全局事务源码

Seata源码版本为1.7.0

源码下载路径



Seata AT模式的设计思路

一阶段

这是分支事务主要的执行流程,不在本章节的源码分析范围内

业务数据和志记录在同回滚日一个本地事务中提交,释放本地锁和连接资源。核心在于对业务sql进行解析,转换成undolog,并同时入库

在这里插入图片描述



二阶段

分布式事务操作成功,则TC通知RM异步删除undolog

在这里插入图片描述



分布式事务操作失败,TM向TC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。

在这里插入图片描述



Seata生命周期图

在这里插入图片描述



总结核心点:

  • Seata是通过AOP wrapIfNecessary() --> MethodInterceptor ---> GlobalTransactionalInterceptor --> TransactionalTemplate

  • 全局事务相关的内容都是TM驱动的,它会向TC发送开始全局事务的请求并得到XID,整个业务方法执行,它会向TC发送全局提交/全局回滚的请求

  • TC再去调用RM对外提供的接口AbstractRMHandler#handle进行全局事务的提交/回滚

  • RM去操作undo_log数据表



源码入口

SeataClient启动流程 在线流程图

在这里插入图片描述

微服务使用Seata基本过程

引入依赖

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

配置文件

远程调用的方法上,使用@GlobalTransactional注解开启全局事务,其他下游服务仅仅使用@Transactional注解即可

@GlobalTransactional(name="createOrder",rollbackFor=Exception.class)
public Order saveOrder(OrderVo orderVo) {......storageFeignService.deduct(..);accountFeignService.debit(..);
}

通过Maven依赖,找sprin.facotries文件,再找自动配置类,进而找到核心Bean

在这里插入图片描述



通过SeataAutoConfiguration自动配置类,往Spring容器中添加GlobalTransactionScanner这个bean对象

@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataAutoConfiguration {private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);@Bean(BEAN_NAME_FAILURE_HANDLER)@ConditionalOnMissingBean(FailureHandler.class)public FailureHandler failureHandler() {// 全局事务回滚失败,那么就会调用这个类的方法,只是打印日志,我们可以自定义FailureHandler类进行扩展return new DefaultFailureHandlerImpl();}// 创建GlobalTransactionScanner对象@Bean@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})@ConditionalOnMissingBean(GlobalTransactionScanner.class)public static GlobalTransactionScanner globalTransactionScanner(...) {......return new GlobalTransactionScanner(...);}
}



接下来看看GlobalTransactionScanner类的详情

public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {......
}

从它的父类AbstractAutoProxyCreator可知,它一定是和代理对象 AOP有关,再去查看核心方法GlobalTransactionScanner#wrapIfNecessary

从它的接口InitializingBean可知,它还有初始化相关的方法要执行GlobalTransactionScanner#afterPropertiesSet

我们先看看它的初始化方法:

  • 调用initClient()方法
  • 调用初始化TM和RM的init()方法
// 该方法只是会调用initClient()方法
public void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)this);return;}if (initialized.compareAndSet(false, true)) {// 会调用该方法initClient();}
}private void initClient() {if (LOGGER.isInfoEnabled()) {LOGGER.info("Initializing Global Transaction Clients ... ");}if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {LOGGER.warn(...);}if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(...);}//init TM// 初始化TM事务管理器TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);//init RM// 初始化RM 资源管理器RMClient.init(applicationId, txServiceGroup);registerSpringShutdownHook();}



进入初始化TM和RM的init()方法后就能看到,他们是基于netty实现的,那么他们肯定就会有一些InboundHandler来处理请求。就拿RM举例

public class RMClient {public static void init(String applicationId, String transactionServiceGroup) {RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());// 这里的DefaultRMHandler,获取到的就是RMInboundHandler接口的实现类// 它会去处理BranchCommitRequest、BranchRollbackRequest、UndoLogDeleteRequest请求rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());rmNettyRemotingClient.init();}
}// 一般处理 TC发送过来的分支提交/分支回滚的请求都是该接口的实现类:AbstractRMHandler类进行处理的
public interface RMInboundHandler {BranchCommitResponse handle(BranchCommitRequest request);BranchRollbackResponse handle(BranchRollbackRequest request);void handle(UndoLogDeleteRequest request);
}



再看看GlobalTransactionScanner#wrapIfNecessary的核心方法:

  • TCC模式就创建TccActionInterceptor拦截器,其他模式就创建GlobalTransactionalInterceptor拦截器
  • 进入GlobalTransactionalInterceptor拦截器的invoke()方法中,获取@GlobalTransactional注解对象,会调用handleGlobalTransaction()方法
  • 从GlobalTransactionalInterceptor#handleGlobalTransaction进入到TransactionalTemplate#execute方法中
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {...try {synchronized (PROXYED_SET) {...interceptor = null;// 如果是TCC模式,那么就创建new TccActionInterceptor(..)拦截器if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);// 创建TccActionInterceptor拦截器interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);} else {......if (globalTransactionalInterceptor == null) {// AT模式、XA模式、Saga模式就创建GlobalTransactionalInterceptor拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}......}}
}// 进入到 GlobalTransactionalInterceptor 拦截器的invoke方法中 
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {Class<?> targetClass =methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);// 获取@GlobalTransactional注解对象final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);if (!localDisable) {if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {AspectTransactional transactional;if (globalTransactionalAnnotation != null) {// 通过@GlobalTransactional注解对象 为 transactional 对象赋值transactional = new AspectTransactional(...);} else {transactional = this.aspectTransactional;}// 调用该方法return handleGlobalTransaction(methodInvocation, transactional);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);}}}return methodInvocation.proceed();
}// 进入到transactionalTemplate.execute()方法中
Object handleGlobalTransaction(final MethodInvocation methodInvocation,final AspectTransactional aspectTransactional) throws Throwable {boolean succeed = true;try {// 进入到transactionalTemplate.execute()方法中return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {return methodInvocation.proceed();}public String name() {...}@Overridepublic TransactionInfo getTransactionInfo() {...}});} catch (TransactionalExecutor.ExecutionException e) {...} finally {if (ATOMIC_DEGRADE_CHECK.get()) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}
}



我们也可以直接在@GlobalTransactional注解这个上面的注释中直接跳转到下面这两个关键方法中去

  • GlobalTransactionScanner#wrapIfNecessary 这里就是整个GlobalTransaction注解AOP的入口
  • GlobalTransactionalInterceptor#handleGlobalTransaction 这里会去调用到transactionalTemplate#execute

在这里插入图片描述



TransactionalTemplate

接下来就进入到了事务模板方法中了

io.seata.tm.api.TransactionalTemplate.execute()

接下来全局事务相关的都是从这个事务模板方法作为入口

public Object execute(TransactionalExecutor business) throws Throwable {TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}GlobalTransaction tx = GlobalTransactionContext.getCurrent();Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {// ......// 传播机制相关的处理}GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Participant) {LOGGER.info("join into a existing global transaction,xid={}", tx.getXid());}try {// 开启全局事务beginTransaction(txInfo, tx);Object rs;try {// 执行业务方法rs = business.execute();} catch (Throwable ex) {// 3. 全局事务回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. 全局事务提交commitTransaction(tx, txInfo);return rs;} finally {//5. clearresumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}
}



开启全局事务

开启全局事务流程 在线流程图

在这里插入图片描述



TM开启全局事务

TransactionalTemplate#beginTransaction方法的具体实现:

  • 调用DefaultGlobalTransaction#begin方法
  • 判断当前角色是否为全局事务发起者,如果不是就不进行之后的流程了
  • 当前 xid != null 就抛异常
  • 通过TM向TC发送开启全局事务的请求,并获取到XID。DefaultTransactionManager#begin发送 GlobalBeginRequest 请求
  • 调整全局事务状态为Begin
  • 将全局事务XID保存至RootContext上下文对象中
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {triggerBeforeBegin();// 进入该方法tx.begin(txInfo.getTimeOut(), txInfo.getName());triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}
}// DefaultGlobalTransaction#begin
public void begin(int timeout, String name) throws TransactionException {this.createTime = System.currentTimeMillis();// 判断当前角色是否为全局事务发起者,如果不是就不进行之后的流程了if (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}// 如果当前成员属性 xid != null 就抛异常assertXIDNull();// 正常流程 RootContext中目前是没有XID的String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}// 通过TM向TC发送开启全局事务的请求,并获取到XIDxid = transactionManager.begin(null, null, name, timeout);// 调整全局事务状态为Beginstatus = GlobalStatus.Begin;// 将全局事务XID保存至RootContext上下文对象中RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}
}// DefaultTransactionManager#begin
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 封装请求参数对象GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);// 调用TC,开启全局事务GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}// 返回XIDreturn response.getXid();
}



服务调用方的web拦截器中就会把XID从RootContext中取出来,存入请求头中,比如SeataFeignRequestInterceptor拦截器

下游服务就会在拦截器中把XID从请求头中取出来,存入RootContext对象中,比如SeataHandlerInterceptor 拦截器



TC处理TM的请求

TC处理TM发送过来的开启全局事务GlobalBeginRequest请求,在这个过程中,TC需要做的事:

  • 开启全局事务,构建一个GlobalSession对象 生成xid,这个对象就是全局事务global_table 数据表对应的实体对象
  • 存储全局事务信息,如果选择DB模式那么就往global_table数据表中插入数据
  • 返回XID



Seata Server端也就是TC这里该如何去找源码的入口嘞,我们知道seata是基于netty实现的,那么也就会有相应的入栈出栈handler,就比如RM就有一个RMInboundHandler接口来处理TC传过来的分支事务提交/回滚请求。那么我们也就应该能找到TCInboundHandler顶级接口

public interface TCInboundHandler {/*** Handle global begin response.*/GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);/*** Handle global commit response.*/GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);/*** Handle global rollback response.*/GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);
}



最终处理请求的入口为DefaultCoordinator#doGlobalBegin,整个开启全局事务的流程为:

  • 构建一个GlobalSession对象,这个过程中会生成一个全局事务XID
  • 调用GlobalSession对象的begin()方法,修改全局事务状态为begin、全局事务开始 开始时间
  • 再经过一段方法调用,往DB的global_table数据表中插入一条全局事务数据。
// DefaultCoordinator#doGlobalBegin
// 方法调用,并为response响应对象设置xid
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {// 调用begin()方法response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info(...);}
}// DefaultCore#begin
// 构建一个GlobalSession对象,这个过程中会生成一个全局事务XID;再调用session.begin()方法
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 构建一个GlobalSession对象,这个过程中会生成一个全局事务XIDGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);MDC.put(RootContext.MDC_KEY_XID, session.getXid());session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 调用begin()方法session.begin();MetricsPublisher.postSessionDoingEvent(session, false);return session.getXid();
}// GlobalSession#begin
@Override
public void begin() throws TransactionException {// 修改全局事务状态为beginthis.status = GlobalStatus.Begin;// 全局事务开始 开始时间this.beginTime = System.currentTimeMillis();// 这个属性值为true,在进行全局事务提交或回滚时会改为false,这样其他分支事务就不能继续注册了this.active = true;for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {// 进入onBegin()方法lifecycleListener.onBegin(this);}
}// AbstractSessionManager#onBegin
@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {// 进入该方法// 各个子类会重写下面的方法  DB 、 file 、 redis 相关的实现类会重写addGlobalSession()方法addGlobalSession(globalSession);
}// DataBaseSessionManager#addGlobalSession
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {// 调用writeSession()方法进行往global_table数据表中插入数据boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}} else {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}}
}// DataBaseTransactionStoreManager#writeSession
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {// 往DB global_table 数据表中插入数据// 调用至LogStoreDataBaseDAO#insertGlobalTransactionDO,操作数据库进行insert// 它支持Mysql、Oracle、PostgreSqlreturn logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else {throw new StoreException("Unknown LogOperation:" + logOperation.name());}
}



全局事务提交

全局事务提交 在线流程图

在这里插入图片描述



微服务端TM发送请求

当整个全局事务的业务方法都执行完成,没有出现异常那么就需要进行全局事务的提交了,TM会向TC发送一个全局事务提交的请求,当然发送全局事务提交的请求是默认有5次重试机制的

在这里插入图片描述



入口方法为TransactionalTemplate#commitTransaction,具体实现:

  • 超时相关的判断;如果超时则进入到全局事务回滚的流程;再调用至DefaultGlobalTransaction#commit方法
  • 当前需要是全局事务发起者角色才能进行全局事务的提交,如果是participate那么就不能进行全局事务的提交
  • XID必须存在,都进入到了全局事务提交的流程了,如果XID为null那么就抛异常
  • 重试机制,默认重试5次
    • 重试次数–
    • TM向TC发送全局事务提交的请求,transactionManager.commit(xid)
    • 进入到DefaultTransactionManager#commit方法,向RM发送GlobalCommitRequest请求
// TransactionalTemplate#commitTransaction
private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo) throws ... {// 超时机制的判断if (isTimeout(tx.getCreateTime(), txInfo)) {Exception exx = new TmTransactionException(...);// 全局事务回滚流程rollbackTransaction(tx, exx);return;}try {triggerBeforeCommit();// 进入该方法  DefaultGlobalTransaction#committx.commit();GlobalStatus afterCommitStatus = tx.getLocalStatus();TransactionalExecutor.Code code = TransactionalExecutor.Code.Unknown;...} catch (TransactionException txe) {...}
}// DefaultGlobalTransaction#commit
@Override
public void commit() throws TransactionException {// 当前需要是全局事务发起者角色才能进行全局事务的提交,如果是participate那么就不能进行全局事务的提交if (role == GlobalTransactionRole.Participant) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}// XID必须存在,如果为null那么就抛异常assertXIDNotNull();// 重试机制,默认重试5次int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {while (retry > 0) {try {// 重试次数--retry--;// TM向TC发送全局事务提交的请求, 调用至DefaultTransactionManager#commitstatus = transactionManager.commit(xid);break;} catch (Throwable ex) {...}}} finally {if (xid.equals(RootContext.getXID())) {suspend(true);}}
}// DefaultTransactionManager#commit
@Override
public GlobalStatus commit(String xid) throws TransactionException {// 封装GlobalCommitRequest请求对象GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);// 向TC发送请求GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);return response.getGlobalStatus();
}

这里,TM向TC发送了GlobalCommitRequest 请求



TC处理TM的请求

TC处理TM发送过来的全局事务提交GlobalCommitRequest请求,在这个过程中,TC需要做的事为:

  • 删除全局锁 lock_table 数据表中的数据
  • 调用各个分支事务,向它们发送分支事务提交BranchCommitRequest请求,驱动它们删除 undo_log 数据表中的数据
  • 删除分支事务 branch_table 数据表中的数据
  • 删除全局事务 global_table 数据表中的数据



入口方法为DefaultCoordinator#doGlobalCommit。整个请求处理的流程为:

  • 调用到DefaultCore#commit(xid)方法,通过xid获取到GlobalSession对象

  • 全局事务提交时,判断是否超时

  • 调用globalSession.closeAndClean()方法,关闭会话、删除全局锁。

    close()关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为false

    clean() 根据xid 删除全局锁 lock_table 数据表中的数据

  • 调用globalSession.asyncCommit()方法,进入全局事务 异步提交逻辑。修改全局事务 global_table 数据表中的状态status字段的值

  • 处理异步提交,DefaultCoordinator类的init()方法中定时任务调用handleAsyncCommitting()方法,再调用DefaultCore#doGlobalCommit

  • 循环处理全局事务对应的所有分支事务

    • branchCommit()向RM发送请求,进行分支提交。得到分支提交后的状态
    • 正常情况下,分支事务提交后的状态为PhaseTwo_Committed,然后调用removeBranch()方法 删除 分支事务 branch_table 数据表中的数据
  • 将全局事务状态改为 Committed

  • 调用SessionHelper.endCommitted()方法 —> globalSession.end() 根据xid 删除全局锁 lock_table 数据表、全局事务 global_table 数据表中的数据



具体源码:

// DefaultCoordinator#doGlobalCommit
// 会调用DefaultCore#commit
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());// 调用core.commit()方法response.setGlobalStatus(core.commit(request.getXid()));
}// DefaultCore#commit
// 全局事务提交时,判断是否超时、关闭会话、根据xid删除全局锁lock_table数据表中的数据、全局事务 异步提交逻辑
@Override
public GlobalStatus commit(String xid) throws TransactionException {// 通过xid获取到GlobalSession对象GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}// 全局事务提交时,判断是否超时if (globalSession.isTimeout()) {LOGGER.info("TC detected timeout, xid = {}", globalSession.getXid());return GlobalStatus.TimeoutRollbacking;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {// 正常情况下,globalSession.getStatus()的值为begin状态if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.// close()关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为false// clean() 根据xid 删除全局锁  lock_table 数据表中的数据globalSession.closeAndClean();if (globalSession.canBeCommittedAsync()) {// 全局事务 异步提交逻辑globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {globalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});// ......
}



看看closeAndClean方法的处理逻辑

  • close()关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为false
  • clean() 根据xid 删除全局锁 lock_table 数据表中的数据
// GlobalSession#closeAndClean
public void closeAndClean() throws TransactionException {// 把globalSession的action属性值设置为falseclose();if (this.hasATBranch()) {// 根据xid 删除全局锁  lock_table 数据表中的数据clean();}
}public void clean() throws TransactionException {// 根据xid 删除全局锁 lock_table 数据表中的数据if (!LockerManagerFactory.getLockManager().releaseGlobalSessionLock(this)) {throw new TransactionException("UnLock globalSession error, xid = " + this.xid);}
}// DataBaseLockManager#releaseGlobalSessionLock
@Override
public boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException {try {// 根据xid 删除全局锁 lock_table 数据表中的数据return getLocker().releaseLock(globalSession.getXid());} catch (Exception t) {LOGGER.error("unLock globalSession error, xid:{}", globalSession.getXid(), t);return false;}
}// DataBaseLocker#releaseLock
@Override
public boolean releaseLock(String xid) {try {// 根据xid 删除全局锁 lock_table 数据表中的数据return lockStore.unLock(xid);} catch (StoreException e) {throw e;} catch (Exception t) {LOGGER.error("unLock by branchIds error, xid {}", xid, t);return false;}
}@Override
public boolean unLock(String xid) {Connection conn = null;PreparedStatement ps = null;try {conn = lockStoreDataSource.getConnection();conn.setAutoCommit(true);//batch release lock by branch list// 根据xid 删除全局锁 lock_table 数据表中的数据String batchDeleteSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchDeleteLockSqlByXid(lockTable);ps = conn.prepareStatement(batchDeleteSQL);ps.setString(1, xid);ps.executeUpdate();} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}return true;
}



接下来是处理全局事务异步提交的逻辑

DefaultCoordinator类的init()会有一个定时任务,循环调用全局事务异步提交的处理。

而真正处理异步提交的是DefaultCoordinator#handleAsyncCommitting

public void init() {retryRollbacking.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);retryCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);// 异步提交的任务,每秒执行 handleAsyncCommitting()方法asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);timeoutCheck.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}



DefaultCoordinator#handleAsyncCommitting具体源码:

  • 向RM发送请求,进行分支提交
  • 删除 分支事务 branch_table 数据表中的数据
  • 删除全局锁 lock_table 数据表中的数据、 删除全局事务 global_table 数据表中的数据
protected void handleAsyncCommitting() {// 异步提交的方法// DefaultCoordinator类的init()方法中定时任务调用该方法SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);Collection<GlobalSession> asyncCommittingSessions =SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);if (CollectionUtils.isEmpty(asyncCommittingSessions)) {return;}SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {try {asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 异步提交,进入该方法core.doGlobalCommit(asyncCommittingSession, true);} catch (TransactionException ex) {LOGGER.error(..);}});
}// DefaultCore#doGlobalCommit
@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start committing eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {// saga流程success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {// 循环处理全局事务对应的所有分支事务Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {if (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// branchCommit()向RM发送请求,进行分支提交。得到分支提交后的状态BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);// 分支事务状态的判断if (isXaerNotaTimeout(globalSession,branchStatus)) {LOGGER.info(...);branchStatus = BranchStatus.PhaseTwo_Committed;}switch (branchStatus) {case PhaseTwo_Committed:// 调用removeBranch()方法// 删除 分支事务 branch_table 数据表中的数据SessionHelper.removeBranch(globalSession, branchSession, !retrying);LOGGER.info(...));return CONTINUE;case PhaseTwo_CommitFailed_Unretryable://not at branchSessionHelper.endCommitFailed(globalSession, retrying);LOGGER.error(...);return false;default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error(...);return CONTINUE;} else {LOGGER.error(...);return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});if (result != null) {return result;}if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}}if (success && globalSession.getBranchSessions().isEmpty()) {if (!retrying) {// 将全局事务状态改为 CommittedglobalSession.setStatus(GlobalStatus.Committed);}// 进入该方法// 根据xid 删除全局锁 lock_table 数据表中的数据、 删除全局事务 global_table 数据表中的数据SessionHelper.endCommitted(globalSession, retrying);LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());}return success;
}// AbstractCore#AbstractCore
// Tc调用RM 进行分支事务提交 发送BranchCommitRequest请求
@Override
public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {// 封装BranchCommitRequest请求对象BranchCommitRequest request = new BranchCommitRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());// 发送RM,进行分支事务提交return branchCommitSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(...);}
}// SessionHelper#removeBranch
public static void removeBranch(GlobalSession globalSession, BranchSession branchSession, boolean isAsync)throws TransactionException {// 根据 branch_id 再进行一次 删除全局锁 lock_table 数据表中的数据globalSession.unlockBranch(branchSession);if (isEnableBranchRemoveAsync() && isAsync) {COORDINATOR.doBranchRemoveAsync(globalSession, branchSession);} else {// 删除 分支事务 branch_table 数据表中的数据globalSession.removeBranch(branchSession);}
}// GlobalSession#removeBranch
@Override
public void removeBranch(BranchSession branchSession) throws TransactionException {for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {// 进入该方法// 删除 分支事务 branch_table 数据表中的数据lifecycleListener.onRemoveBranch(this, branchSession);}remove(branchSession);
}// AbstractSessionManager#onRemoveBranch
@Override
public void onRemoveBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {// 进入该方法// 各个子类会重写下面的方法  DB 、 file 、 redis 相关的实现类会重写removeBranchSession()方法// 删除 分支事务 branch_table 数据表中的数据removeBranchSession(globalSession, branchSession);
}// DataBaseSessionManager#removeBranchSession
@Override
public void removeBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException {if (StringUtils.isNotBlank(taskName)) {return;}// 删除 分支事务 branch_table 数据表中的数据// 再往后就是执行sql相关的代码了boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_REMOVE, session);if (!ret) {throw new StoreException("removeBranchSession failed.");}
}// 分支事务删除后,便是删除全局事务表中的数据了
// SessionHelper#endCommitted
public static void endCommitted(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {if (retryGlobal || !DELAY_HANDLE_SESSION) {long beginTime = System.currentTimeMillis();boolean retryBranch = globalSession.getStatus() == GlobalStatus.CommitRetrying;globalSession.changeGlobalStatus(GlobalStatus.Committed);// 进入该方法// 根据xid 删除全局锁 lock_table 数据表中的数据、 删除全局事务 global_table 数据表中的数据globalSession.end();if (!DELAY_HANDLE_SESSION) {MetricsPublisher.postSessionDoneEvent(globalSession, false, false);}MetricsPublisher.postSessionDoneEvent(globalSession, IdConstants.STATUS_VALUE_AFTER_COMMITTED_KEY, true,beginTime, retryBranch);} else {if (globalSession.isSaga()) {globalSession.setStatus(GlobalStatus.Committed);globalSession.end();}MetricsPublisher.postSessionDoneEvent(globalSession, false, false);}
}// GlobalSession#end
@Override
public void end() throws TransactionException {if (GlobalStatus.isTwoPhaseSuccess(status)) {// Clean locks first// 再根据xid 删除全局锁 lock_table 数据表中的数据clean();for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {// 进入该方法// 根据xid 删除全局事务 global_table 数据表中的数据lifecycleListener.onSuccessEnd(this);}} else {for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onFailEnd(this);}}
}// AbstractSessionManager#onSuccessEnd
@Override
public void onSuccessEnd(GlobalSession globalSession) throws TransactionException {// 进入该方法// 各个子类会重写下面的方法  DB 、 file 、 redis 相关的实现类会重写removeBranchSession()方法// 根据xid 删除全局事务 global_table 数据表中的数据removeGlobalSession(globalSession);
}



RM处理TC的请求

RM处理TC发送过来的分支事务提交BranchCommitRequest请求,在这个过程中,RM需要做的事:

  • 删除undo_log表数据

处理分支事务提交请求的入口为AbstractRMHandler.handle(BranchCommitRequest request)。总流程为:

  • handler()方法调用doBranchCommit()方法,该方法再调用至资源管理器DataSourceManager#branchCommit

  • 异步分支事务提交 进入到AsyncWorker@branchCommit,将xid+branchId+resourceId封装为Phase2Context对象,添加进commitQueue

  • AsyncWorker@doBranchCommit处理队列中的Phase2Context对象,获取数据库连接对象、UndoLogManager。

  • 通过调用undoLogManager.batchDeleteUndoLog()批量删除undo_log

// AbstractRMHandler#handle
// 主要是调用doBranchCommit()方法
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {// 分支事务提交的响应对象BranchCommitResponse response = new BranchCommitResponse();// 进入到doBranchCommit方法中exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {@Overridepublic void execute(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {doBranchCommit(request, response);}}, request, response);return response;
}// AbstractRMHandler#doBranchCommit
// 先获取请求参数,然后调用资源管理器的方法,以AT模式举例 这里会进入到DataSourceManager#branchCommit
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {// 获取请求参数String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);}// 调用资源管理器进行分支事务提交,AT模式进入到DataSourceManager#branchCommitBranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch commit result: " + status);}}// DataSourceManager#branchCommit
// 调用AsyncWorker#branchCommit方法,异步处理分支提交请求
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {// 异步分支事务提交 进入到AsyncWorker#branchCommit// 返回BranchStatus.PhaseTwo_Committed 两阶段提交的状态return asyncWorker.branchCommit(xid, branchId, resourceId);
}
// AsyncWorker#branchCommit
// 调用AsyncWorker#addToCommitQueue方法,将请求参数封装为Phase2Context对象并进行入队操作
public BranchStatus branchCommit(String xid, long branchId, String resourceId) {Phase2Context context = new Phase2Context(xid, branchId, resourceId);// 添加进提交队列中addToCommitQueue(context);return BranchStatus.PhaseTwo_Committed;
}
// AsyncWorker#addToCommitQueue
// Phase2Context对象入队操作,如果队列满了那么就调用doBranchCommitSafely()方法去处理队列中的Phase2Context对象
private void addToCommitQueue(Phase2Context context) {// 入队,offer()方法,如果队列满了不抛异常,返回false,然后下面再去处理这个提交// 异步处理提交的方法为doBranchCommit()方法if (commitQueue.offer(context)) {return;}// 去调用从队列中取任务的方法,处理提交事务的任务,再把当前提交继续往commitQueue队列中添加CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor).thenRun(() -> addToCommitQueue(context));
}// 分支事务异步处理,入队操作完成后,接下来就是从队列去取对象进行处理了,通过undoLogManager对象进行删除undolog
// AsyncWorker#doBranchCommit
private void doBranchCommit() {// 提交事务队列为空就直接返回if (commitQueue.isEmpty()) {return;}// 取出当前队列中所有的二阶段提交上下文对象List<Phase2Context> allContexts = new LinkedList<>();commitQueue.drainTo(allContexts);// 进行分组Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);// 循环调用dealWithGroupedContexts(..)方法groupedContexts.forEach(this::dealWithGroupedContexts);
}
// 上面方法调用该方法
private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {if (StringUtils.isBlank(resourceId)) {LOGGER.warn("resourceId is empty and will skip.");return;}DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);if (dataSourceProxy == null) {LOGGER.warn("failed to find resource for {} and requeue", resourceId);addAllToCommitQueue(contexts);return;}Connection conn = null;try {// 获取数据库连接对象conn = dataSourceProxy.getPlainConnection();// 获取UndoLogManager undolog管理器对象UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());// 将上下文拆分为几个列表,每个列表包含的元素不超过1000限制大小List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);for (List<Phase2Context> partition : splitByLimit) {// 进行删除undo_log数据表中的数据deleteUndoLog(conn, undoLogManager, partition);}} catch (SQLException sqlExx) {addAllToCommitQueue(contexts);LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, sqlExx);} finally {IOUtil.close(conn);}}
// 通过UndoLogManager对象,调用batchDeleteUndoLog(xids, branchIds, conn)删除undo_log数据
private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {Set<String> xids = new LinkedHashSet<>(contexts.size());Set<Long> branchIds = new LinkedHashSet<>(contexts.size());contexts.forEach(context -> {xids.add(context.xid);branchIds.add(context.branchId);});try {// 通过UndoLogManager进行批量删除undo_logundoLogManager.batchDeleteUndoLog(xids, branchIds, conn);// 提交if (!conn.getAutoCommit()) {conn.commit();}} catch (SQLException e) {LOGGER.error("Failed to batch delete undo log", e);try {conn.rollback();addAllToCommitQueue(contexts);} catch (SQLException rollbackEx) {LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", rollbackEx);}}
}



全局事务回滚

全局事务回滚在线流程图

在这里插入图片描述



TM发送请求

当整个全局事务的业务方法执行过程中出现异常那么就需要进行全局事务的回滚了,TM会向TC发送一个全局事务回滚的请求,当然请求是默认有5次重试机制的

在这里插入图片描述



入口方法为TransactionalTemplate#completeTransactionAfterThrowing,具体实现:

  • 校验异常,判断当前的异常是否需要回滚;如果异常校验不通过则进入全局事务提交的流程;再调用至DefaultGlobalTransaction#rollback方法
  • 当前需要是全局事务发起者角色才能进行全局事务的回滚,如果是participate那么就不能进行全局事务的回滚
  • XID必须存在,都进入到了全局事务回滚的流程了,如果XID为null那么就抛异常
  • 重试机制,默认重试5次
    • 重试次数–
    • TM向TC发送全局事务提交的请求,transactionManager.rollback(xid)
    • 进入到DefaultTransactionManager#rollback方法,向RM发送GlobalRollbackRequest请求
// 异常的校验,判断是否需要进行全局事务回滚,在调用至rollbackTransaction()方法
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)throws TransactionalExecutor.ExecutionException, TransactionException {// 校验异常,判断当前的异常是否需要回滚if (txInfo != null && txInfo.rollbackOn(originalException)) {// 全局事务回滚rollbackTransaction(tx, originalException);} else {// 全局事务提交commitTransaction(tx, txInfo);}
}// 该方法主要是调用至DefaultGlobalTransaction#rollback方法
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {try {triggerBeforeRollback();// 进入该方法 调用至DefaultGlobalTransaction#rollback方法tx.rollback();triggerAfterRollback();} catch (TransactionException txe) {// Failed to rollbackthrow new TransactionalExecutor.ExecutionException(...);}// ......}// DefaultGlobalTransaction#rollback方法
@Override
public void rollback() throws TransactionException {// 事务发起者角色校验,如果是participate则直接返回if (role == GlobalTransactionRole.Participant) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);}return;}// XID必须存在,如果为null那么就抛异常assertXIDNotNull();if (LOGGER.isInfoEnabled()) {LOGGER.info("transaction {} will be rollback", xid);}// 重试机制,默认重试5次int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;try {while (retry > 0) {try {// 重试次数--retry--;// TM向TC发送全局事务回滚的请求,调用至DefaultTransactionManager#rollbackstatus = transactionManager.rollback(xid);break;} catch (Throwable ex) {...}}} finally {if (xid.equals(RootContext.getXID())) {suspend(true);}}
}// DefaultTransactionManager#rollback
@Override
public GlobalStatus rollback(String xid) throws TransactionException {// 封装GlobalRollbackRequest请求对象GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);// 向TC发送请求GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);return response.getGlobalStatus();
}



TC处理TM的请求

TC处理TM发送过来的全局事务回滚GlobalRollbackRequest请求,在这个过程中,TC需要做的事为:

  • 删除全局锁 lock_table 数据表中的数据
  • 调用各个分支事务,向它们发送分支事务回滚BranchRollbackRequest请求,驱动它们进行数据补偿,并删除 undo_log 数据表中的数据
  • 删除分支事务 branch_table 数据表中的数据
  • 删除全局事务 global_table 数据表中的数据



入口方法为DefaultCoordinator#doGlobalRollback。整个请求处理的流程为:

  • 调用到DefaultCore#rollback(xid)方法,通过xid获取到GlobalSession对象

  • globalSession.close()关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为false

  • 改变全局事务状态为Rollbacking,再进入到doGlobalRollback()方法

  • 遍历当前全局事务所对应的分支事务

    • 如果当前分支事务是一阶段失败状态PhaseOne_Failed,那么就调用SessionHelper.removeBranch()删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据

      因为这个分支事务在执行自己的业务sql时就抛异常了,所以它的undo_log表中都还没有插入数据

    • branchRollback() TC 调用 RM 通知分支事务进行分支事务回滚

    • 回滚成功情况,调用SessionHelper.removeBranch()删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据

    • 回滚失败情况,修改全局事务状态, 再调用globalSession.end()方法 默认实现 打印错误日志信息

  • 进入到SessionHelper.endRollbacked() --> 修改全局事务状态 Rollbacked

  • 再调用globalSession.end()方法,根据xid 删除全局锁 lock_table 数据表、全局事务 global_table 数据表中的数据



具体源码实现:

// DefaultCoordinator#doGlobalRollback
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());// 调用core.rollback()response.setGlobalStatus(core.rollback(request.getXid()));
}// DefaultCore#rollback
@Override
public GlobalStatus rollback(String xid) throws TransactionException {// 通过xid获取到GlobalSession对象GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {// 关闭会话,这样其他分支事务就不能继续注册了,把globalSession的action属性值设置为falseglobalSession.close(); // 改变全局事务状态为Rollbackingif (globalSession.getStatus() == GlobalStatus.Begin) {globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}// 进入到doGlobalRollback()方法boolean rollbackSuccess = doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();
}// DefaultCore#doGlobalRollback
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start rollback eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {// Saga模式的处理success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);} else {// 遍历当前全局事务所对应的分支事务Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();// 如果当前分支事务是一阶段失败状态if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {// 删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// 通知分支事务进行分支事务回滚BranchStatus branchStatus = branchRollback(globalSession, branchSession);// 修改分支事务状态if (isXaerNotaTimeout(globalSession, branchStatus)) {LOGGER.info(...);branchStatus = BranchStatus.PhaseTwo_Rollbacked;}switch (branchStatus) {case PhaseTwo_Rollbacked:// 进入removeBranch()方法// 删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据SessionHelper.removeBranch(globalSession, branchSession, !retrying);LOGGER.info(...);return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable:// 两阶段回滚失败的处理流程// 修改全局事务状态, 再调用globalSession.end()方法  默认实现 打印错误日志信息SessionHelper.endRollbackFailed(globalSession, retrying);LOGGER.error(...);// 这里就返回false了return false;default:LOGGER.error(...);if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {// ...throw new TransactionException(ex);}});// Return if the result is not nullif (result != null) {return result;}}// 在db模式下,可能会出现锁和分支数据残留问题。因此,这里的执行需要延迟,并且不能同步执行。if (success) {SessionHelper.endRollbacked(globalSession, retrying);LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());}return success;
}



上面调用方法的具体实现如下:

// SessionHelper#removeBranch
// 删除全局锁 lock_table 数据表中的数据 、删除 分支事务 branch_table 数据表中的数据
public static void removeBranch(GlobalSession globalSession, BranchSession branchSession, boolean isAsync)throws TransactionException {// 再进行一次 删除全局锁 lock_table 数据表中的数据globalSession.unlockBranch(branchSession);if (isEnableBranchRemoveAsync() && isAsync) {COORDINATOR.doBranchRemoveAsync(globalSession, branchSession);} else {// 删除 分支事务 branch_table 数据表中的数据globalSession.removeBranch(branchSession);}
}// DefaultCore#branchRollback
// 调用branchRollback()方法 TC 调用 RM 进行分支事务回滚
public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {// 进入branchRollback()方法  TC 调用 RM 进行分支事务回滚return getCore(branchSession.getBranchType()).branchRollback(globalSession, branchSession);
}
// AbstractCore#branchRollback
public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {// 封装分支事务回滚请求对象 BranchRollbackRequestBranchRollbackRequest request = new BranchRollbackRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());// TC 调用 RM 发送请求return branchRollbackSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(...);}
}// SessionHelper#endRollbacked
// 修改全局事务状态为 Rollbacked。
// 调用 end()方法,再根据xid 删除全局锁 lock_table 数据表、全局事务 global_table 数据表中的数据
public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {if (retryGlobal || !DELAY_HANDLE_SESSION) {long beginTime = System.currentTimeMillis();boolean timeoutDone = false;GlobalStatus currentStatus = globalSession.getStatus();if (currentStatus == GlobalStatus.TimeoutRollbacking) {MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.TimeoutRollbacked, false, false);timeoutDone = true;}boolean retryBranch =currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);} else {// 修改全局事务状态为 RollbackedglobalSession.changeGlobalStatus(GlobalStatus.Rollbacked);}// 调用 end()方法// 再根据xid 删除全局锁 lock_table 数据表、全局事务 global_table 数据表中的数据globalSession.end();if (!DELAY_HANDLE_SESSION && !timeoutDone) {MetricsPublisher.postSessionDoneEvent(globalSession, false, false);}MetricsPublisher.postSessionDoneEvent(globalSession, IdConstants.STATUS_VALUE_AFTER_ROLLBACKED_KEY, true,beginTime, retryBranch);} else {if (globalSession.isSaga()) {globalSession.setStatus(GlobalStatus.Rollbacked);globalSession.end();}MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Rollbacked, false, false);}
}



RM处理TC的请求

RM处理TC发送过来的分支事务回滚BranchCommitRequest请求,在这个过程中,RM需要做的事:

  • 查询undo_log表数据
    • beforeimage 和 after image进行比较,如果相等就不需要进行数据补偿,如果不相等就继续判断
    • after image和 current 当前数据比较,如果相等就去生成逆向sql进行数据补偿,如果不相等就继续判断
    • before image和current当前数据比较,如果相等那么也就不需要进行数据补偿,如果也不相等那么就抛异常
  • 删除undo_log表数据



处理分支事务回滚请求的入口为AbstractRMHandler.handle(BranchRollbackRequest request)。总流程为:

  • AbstractRMHandler#handle方法会调用至doBranchRollback(..)方法

  • 从request对象中获取请求参数,调用RM资源管理器的分支事务回滚方法,AT模式进入到DataSourceManager#branchRollback

  • 获取到UndoLogManager对象,并调用它的undo(..)方法,调用至AbstractUndoLogManager#undo

  • 执行查询语句,查询到branchId+xid对应的undo_log表数据;读取context字段值,保存着before image和after image

  • 从after image中读取List<SQLUndoLog> 集合,并遍历该集合,循环体中调用至AbstractUndoExecutor#executeOn

  • 调用AbstractUndoExecutor#dataValidationAndGoOn判断是否需要执行undo sql进行数据恢复,也就是check after image流程。

    • beforeRecords 和 afterRecords进行比较,如果相等就不需要执行undo sql进行数据恢复
    • 查询当前数据表中的数据
    • 如果afterRecords和 currentRecords 当前数据比较,如果相等就去执行 undo sql进行数据补偿,如果不相等则继续判断
    • beforeRecords和 currentRecords 当前数据比较,如果相等那么也就不需要进行数据恢复,否则抛SQLUndoDirtyException
  • 经过上方的判断,如果没有return,则进入到这个流程:生成 undo sql、执行undo sql

  • 最后删除undo log表数据



// AbstractRMHandler#handle  方法会调用至`doBranchRollback(..)`方法
@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {// 分支事务回滚的响应对象BranchRollbackResponse response = new BranchRollbackResponse();// 进入到doBranchRollback()方法中exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {@Overridepublic void execute(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {doBranchRollback(request, response);}}, request, response);return response;
}// 从request对象中获取请求参数,调用RM资源管理器的分支事务回滚方法,AT模式进入到`DataSourceManager#branchRollback`
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {// 获取请求参数String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);}// 调用RM资源管理器的分支事务回滚方法,AT模式进入到DataSourceManager#branchRollbackBranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacked result: " + status);}
}// DataSourceManager#branchRollback
// 获取到UndoLogManager对象,并调用它的undo(..)方法,调用至AbstractUndoLogManager#undo
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));}try {// 获取到UndoLogManager对象,并调用它的undo(..)方法,调用至AbstractUndoLogManager#undoUndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);if (LOGGER.isInfoEnabled()) {LOGGER.info("branch rollback success, xid:{}, branchId:{}", xid, branchId);}} catch (TransactionException te) {// ......}return BranchStatus.PhaseTwo_Rollbacked;}// AbstractUndoLogManager#undo
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {ConnectionProxy connectionProxy = null;Connection conn = null;ResultSet rs = null;PreparedStatement selectPST = null;boolean originalAutoCommit = true;for (; ; ) {try {connectionProxy = dataSourceProxy.getConnection();// 获取数据库连接对象conn = connectionProxy.getTargetConnection();if (originalAutoCommit = conn.getAutoCommit()) {conn.setAutoCommit(false);}// 通过查询sql语句,获得PreparedStatement对象selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);selectPST.setLong(1, branchId);selectPST.setString(2, xid);// 执行查询语句,得到查询数据rs = selectPST.executeQuery();boolean exists = false;while (rs.next()) {exists = true;int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);if (!canUndo(state)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);}return;}// undo_log数据表中的context字段值,保存着before image和after imageString contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);Map<String, String> context = parseContext(contextString);byte[] rollbackInfo = getRollbackInfo(rs);// 序列化String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);// 取undolog对象UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance(): UndoLogParserFactory.getInstance(serializer);BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);try {setCurrentSerializer(parser.getName());// 得到branchId与xid对应的那条undo_log数据表记录中的所有SQLUndoLog对象List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();if (sqlUndoLogs.size() > 1) {Collections.reverse(sqlUndoLogs);}// 遍历SQLUndoLog集合for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());sqlUndoLog.setTableMeta(tableMeta);AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);// 进入该方法,该方法内会进行check after image相关的判断undoExecutor.executeOn(connectionProxy);}} finally {// remove serializer nameremoveCurrentSerializer();}}if (exists) {// 删除undo_log表数据deleteUndoLog(xid, branchId, conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,State.GlobalFinished.name());}} else {insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,State.GlobalFinished.name());}}return;} catch (SQLIntegrityConstraintViolationException e) {//......} catch (Throwable e) {//......} finally {//......}}
}// AbstractUndoExecutor#executeOn
public void executeOn(ConnectionProxy connectionProxy) throws SQLException {Connection conn = connectionProxy.getTargetConnection();// 判断是否需要进行undoLog的执行,after image == current 才会走之后的处理流程进行数据恢复if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(connectionProxy)) {return;}PreparedStatement undoPST = null;try {// 生成undo  Sql语句String undoSQL = buildUndoSQL();// 获取PreparedStatement对象undoPST = conn.prepareStatement(undoSQL);// sql参数的处理TableRecords undoRows = getUndoRows();for (Row undoRow : undoRows.getRows()) {ArrayList<Field> undoValues = new ArrayList<>();List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, connectionProxy.getDbType());for (Field field : undoRow.getFields()) {if (field.getKeyType() != KeyType.PRIMARY_KEY) {undoValues.add(field);}}undoPrepare(undoPST, undoValues, pkValueList);// 执行sqlundoPST.executeUpdate();}} catch (Exception ex) {if (ex instanceof SQLException) {throw (SQLException) ex;} else {throw new SQLException(ex);}}finally {//important for oracleIOUtil.close(undoPST);}}protected boolean dataValidationAndGoOn(ConnectionProxy conn) throws SQLException {TableRecords beforeRecords = sqlUndoLog.getBeforeImage();TableRecords afterRecords = sqlUndoLog.getAfterImage();// beforeRecords 和 afterRecords进行比较,如果相等就不需要执行undo sql进行数据恢复Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);if (beforeEqualsAfterResult.getResult()) {return false;}// 查询当前数据表中的数据TableRecords currentRecords = queryCurrentRecords(conn);// 如果afterRecords和 currentRecords 当前数据比较,如果相等就去执行 undo  sql进行数据补偿,如果不相等则继续判断Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);if (!afterEqualsCurrentResult.getResult()) {// beforeRecords和 currentRecords 当前数据比较,如果相等那么也就不需要进行数据恢复,否则抛SQLUndoDirtyExceptionResult<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);if (beforeEqualsCurrentResult.getResult()) {return false;} else {//...throw new SQLUndoDirtyException("Has dirty records when undo.");}}// 老老实实去执行undo sql,进行数据恢复return true;
}



补充知识

微服务怎么找TC服务

源码入口在NacosRegistryServiceImpl.lookup(String key)方法

public List<InetSocketAddress> lookup(String key) throws Exception {// 根据service.vgroupMapping.${seata.tx-service-group} 作为key去配置中心找value// 得到TC 这个微服务service下的clusterName// 在通过clusterName去服务注册中心找TCservice的instanceString clusterName = getServiceGroup(key);//......if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {synchronized (LOCK_OBJ) {if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {List<String> clusters = new ArrayList<>();clusters.add(clusterName);// 根据ServiceName、ServiceGroup、clusterName去注册中心找instanceList<Instance> firstAllInstances = getNamingInstance().getAllInstances(getServiceName(), getServiceGroup(), clusters);//......}}}return CLUSTER_ADDRESS_MAP.get(clusterName);
}



所以,第一步先获取${seata.tx-service-group}配置项的值

在这里插入图片描述



得到了service.vgroupMapping.default_tx_group这么一个字符串,然后把这个作为key,去配置中心找值,这里找到的就是TC的clusterName

在这里插入图片描述

在这里插入图片描述



这里就得到了TC的集群名字,根据ServiceName、ServiceGroup、clusterName去注册中心找instance

在这里插入图片描述



最终找到TC service的instance

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3246901.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

配置三层链路聚合增加链路带宽并提高可靠性的示例

规格 适用于所有版本的AR路由器。 AR161、AR161W、AR169、AR161G-L不支持该示例。 组网需求 在某小型企业网环境中部署了两台AR路由器Router_1和Router_2&#xff0c;Router_1作为用户接入设备&#xff0c;Router_2作为网络接入设备。为了保证用户的带宽&#xff0c;当用户量…

【Kaggle】练习赛《保险交叉销售的二分类预测》

前言 本篇文章介绍的是Kaggle月赛《Binary Classification of Insurance Cross Selling》&#xff0c;即《保险交叉销售的二元分类预测》。这场比赛非常适合作为机器学习入门者的实践练习。在之前的几期练习赛中&#xff0c;我们从多个角度详细讲解了探索性数据分析&#xff0…

爆火出圈的Robotaxi,会是自动驾驶的最优解吗?

八年前&#xff0c;百度决定投资无人驾驶时&#xff0c;李彦宏说&#xff1a;“它是人工智能最顶级的工程&#xff0c;将彻底改变人类的出行和生活。” 八年后&#xff0c;萝卜快跑从理想变成现实&#xff0c;奔跑在全国各地的街头&#xff0c;诠释了什么叫“科技不该高高在上…

2.javaWeb_请求和响应的处理(Request,Response)

2.请求和响应的处理 文章目录 2.请求和响应的处理一、动态资源和静态资源javax.servlet(包) 二、Servlet体系1.简介2.HttpServlet3.Servlet生命周期 三、Request对象1.ServletRequest1)ServletRequest主要功能有&#xff1a;2)ServletRequest类的常用方法: 2.HttpServletReques…

72B大模型分片部署

一、定义 目的官方教程案例小模型修改device_map 方式二 二、实现 目的&#xff1a; 将72B大模型 部署到2张gpu 显卡中。官方教程 帖子&#xff1a;https://huggingface.co/blog/accelerate-large-models实现 1. 自动部署 model AutoModelForCausalLM.from_pretrained(mod…

JUC 包中的 Atomic 原子类总结

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

【Java数据结构】初始线性表之一:链表

为什么要有链表 上一节我们描述了顺序表&#xff1a;【Java数据结构】初识线性表之一&#xff1a;顺序表-CSDN博客 并且进行了简单模拟实现。通过源码知道&#xff0c;ArrayList底层使用数组来存储元素。 由于其底层是一段连续空间&#xff0c;当在ArrayList任意位置插入或者…

Linux shell编程学习笔记64:vmstat命令 获取进程、内存、虚拟内存、IO、cpu等信息

0 前言 在系统安全检查中&#xff0c;通常要收集进程、内存、IO等信息。Linux提供了功能众多的命令来获取这些信息。今天我们先研究vmstat命令。 1.vmstat命令的功能、用法、选项说明和注意事项 1.1 vmstat命令的功能 vmstat是 Virtual Meomory Statistics&#xff08;虚拟内…

4.作业--Jquery,JS

目录 作业题目&#xff1a;1.使用Jquery完成点击图片变换图片颜色 A图 B代码 HTML的部分 JQ的部分 作业题目&#xff1a;2.使用JS中的DOM操作完成背景颜色渐变方向变换。点击背景&#xff0c;渐变方向发生改变。 A图 B代码 学习产出&#xff1a; 作业题目&#xff1a;1…

封装网络请求 鸿蒙APP HarmonyOS ArkTS

一、效果展示 通过在页面直接调用 userLogin(params) 方法&#xff0c;获取登录令牌 二、申请网络权限 访问网络时候首先需要申请网络权限&#xff0c;需要修改 src/main 目录下的 module.json5 文件&#xff0c;加入 requestPermissions 属性&#xff0c;详见官方文档 【声明权…

深度学习Week20——Pytorch实现残差网络和ResNet50V2算法

文章目录 深度学习Week20——Pytorch实现残差网络和ResNet50V2算法 一、前言 二、我的环境 三、代码复现 3.1 配置数据集 3.2 构建模型 四、模型应用与评估 4.1 编写训练函数 4.2 编写测试函数 4.3 训练模型 4.4 结果可视化 一、前言 &#x1f368; 本文为&#x1f517;365天深…

昇思25天学习打卡营第 12 天 | mindspore 实现 ResNet50 图像分类

1. 背景&#xff1a; 使用 mindspore 学习神经网络&#xff0c;打卡第 12 天&#xff1b;主要内容也依据 mindspore 的学习记录。 2. ResNet 介绍&#xff1a; mindspore 实现 ResNet50 图像分类&#xff1b; ResNet 基本介绍&#xff1a; Residual Networks 是微软研究院 K…

港股指数实时行情API接口

港股 指数 实时 行情 API接口 # Restful API https://tsanghi.com/api/fin/index/HKG/realtime?token{token}&ticker{ticker}指定指数代码&#xff0c;获取该指数的实时行情&#xff08;开、高、低、收、量&#xff09;。 更新周期&#xff1a;实时。 请求方式&#xff1a…

GuLi商城-商品服务-API-属性分组-分组修改级联选择器回显

前端代码:略 后端回显接口: 递归方法: @Override publi

linux进程——父子进程层面的PID,fork的原理与理解

前言&#xff1a;本篇内容主要讲解进程中系统调用fork和父子进程的概念与原理&#xff0c; 想要系统学习linux进程的友友们只管看本篇文章是不行的。 还要学习一些linux进程的周边知识以及linux进程其他方面的知识&#xff0c;博主的linux专栏中已经加入了这些文章方便友友们进…

连锁零售门店分析思路-人货场 数据分析

连锁零售门店分析思路 以下是一个连锁零售门店的分析思路&#xff1a; 一、市场与竞争分析 二、门店运营分析&#xff08;销售分析&#xff09; 三、销售与财务分析 四、客户分析 五、数字化与营销分析 最近帮一个大学生培训&#xff0c;就门店销售分析 &#xff0c;说到门店…

记录些MySQL题集(8)

ACID原则、事务隔离级别及事务机制原理 一、事务的ACID原则 什么是事务呢&#xff1f;事务通常是由一个或一组SQL组成的&#xff0c;组成一个事务的SQL一般都是一个业务操作&#xff0c;例如聊到的下单&#xff1a;「扣库存数量、增加订单详情记录、插入物流信息」&#xff0…

Css布局-伸缩盒笔记

前言 伸缩盒作为css3中的布局标准&#xff0c;不得不学呀&#xff0c;跟着b站yu神走一遍&#xff0c;yushen牛逼&#xff01; 伸缩盒子布局的优势 当然是伸缩了 伸缩容器与伸缩项目 display: flex display: inline-flex &#xff08;用的少&#xff09; 一个html元素既可以是…

我们距离通用人工智能还有多远?当它诞生后,会给社会发展带来哪些变革?

当我们谈论通用人工智能&#xff08;AGI&#xff09;&#xff0c;我们指的是一种能够像人类一样执行各种认知任务的人工智能系统。目前&#xff0c;我们所拥有的人工智能技术主要是狭义人工智能&#xff08;ANI&#xff09;&#xff0c;专注于特定任务&#xff0c;如语音识别、…

老司机减分宝典助手-学法减分扣分题目及答案 #经验分享#经验分享#职场发展

学法减分其实就是把我们驾驶证上面的分数一分一分地找回来&#xff0c;为什么说是一分一分地找回来呢&#xff1f;因为必须先把违章处理完才可以&#xff0c;无论这辆车是不是你的&#xff0c;无论这辆车挂靠在谁的公司名下或者是单位名下&#xff0c;你都可以把这个分找回来&a…