分布式事务之最终一致性

分布式事务之最终一致性

    • 参考链接
    • 分布式事务基础理论
    • 概述案例
    • 解决方案:RocketMQ可靠消息
    • 注意事项:
    • 代码实现

参考链接

原文链接:https://blog.csdn.net/jikeyeka/article/details/126296938

分布式事务基础理论

基于上述的CAP和BASE理论,一般情况下会保证P和A,舍弃C,保证最终一致性。最终一致是指经过一段时间后,所有节点数据都将会达到一致。如订单的"支付中"状态,最终会变为“支付 成功”或者"支付失败",使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。

概述案例

此方案的核心是将分布式事务拆分成多个本地事务,然后通过网络由消息队列协调完成所有事务,并实现最终一致性。以商城下单为例:
在这里插入图片描述

  1. 消息发送方,用户下单: 创建订单,然后通过网络发送消息到MQ

  2. 消息接收方,扣减库存: 通过网络从MQ中接收消息,然后扣减库存

该解决方案容易理解,实现成本低,但是面临以下几个问题:

1.消息发送方执行本地事务与发送消息的原子性问题,也就是说如何保证本地事务执行成功,消息一定发送成功

begin transaction
1.数据库操作
2.发送消息
commit transation

这种情况下,貌似没有问题,如果发送消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但此时消息已经正常发送了,同样会导致不一致。

2.消息接收方接收消息与本地事务的原子性问题,也就是说如何保证接收消息成功后,本地事务一定执行成功

3.由于消息可能会重复发送,这就要求消息接收方必须实现幂等性

由于在生产环境中,消费方很有可能是个集群,若某一个消费节点超时但是消费成功,会导致集群同组 其他节点重复消费该消息。另外意外宕机后恢复,由于消费进度没有及时写入磁盘,会导致消费进度部 分丢失,从而导致消息重复消费。

解决方案:RocketMQ可靠消息

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便 利性支持。因此,我们通过RocketMQ就可以解决前面的问题。

1.消息发送方执行本地事务与发送消息的原子性问题,也就是说如何保证本地事务执行成功,消息一定发送成功

RocketMQ中的Broker 与 发送方 具备双向通信能力,使得 broker 天生可以作为一个事务协调者存在;并且RocketMQ 本身提供了存储机制,使得事务消息可以持久化保存;这些优秀的设计可以保证即使发生了异常,RocketMQ依然能够保证达成事务的最终一致性。
在这里插入图片描述

  1. 发送方发送一个事务消息给Broker,RocketMQ会将消息状态标记为“Prepared”,此时这条消息暂时不能被接收方消费。这样的消息称之为Half Message,即半消息。

  2. Broker返回发送成功给发送方

  3. 发送方执行本地事务,例如操作数据库

  4. 若本地事务执行成功,发送commit消息给Broker,RocketMQ会将消息状态标记为“可消费”,此 时这条消息就可以被接收方消费;若本地事务执行失败,发送rollback消息给Broker,RocketMQ 将删除该消息

  5. 如果发送方在本地事务过程中,出现服务挂掉,网络闪断或者超时,那Broker将无法收到确认结 果

  6. 此时RocketMQ将会不停的询问发送方来获取本地事务的执行状态(即事务回查)

  7. 根据事务回查的结果来决定Commit或Rollback,这样就保证了消息发送与本地事务同时成功或同时失败。

以上主干流程已由RocketMQ实现,对于我们来说只需要分别实现本地事务执行的方法以及本地事务回查的方法即可,具体来说就是实现下面这个接口:

public interface TransactionListener {
/**
- 发送prepare消息成功后回调该方法用于执行本地事务
- @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
- @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,
这里能获取到
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:未知,需要回查
*/
LocalTransactionState executeLocalTransaction(final Message msg, final
Object arg);
/**
- @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:未知,需要回查
*/
LocalTransactionState checkLocalTransaction(Message msg);
}

2.消息接收方接收消息与本地事务的原子性问题,也就是说如何保证接收消息成功后,本地事务一定执行成功

  1. 如果是出现了异常,RocketMQ会通过重试机制,每隔一段时间消费消息,然后再执行本地事务;如果 是超时,RocketMQ就会无限制的消费消息,不断的去执行本地事务,直到成功为止。
  2. 实现逆向流程,比如下单流程:在订单服务执行本地事务成功,消息commit, 然后 商品服务 消费消息进行扣减库存,在商品服务扣减库存过程中发生异常,那么先将本地数据回滚,然后发送回滚消息,在订单服务监听回滚消息进行处理。

在这里插入图片描述

注意事项:

  1. 因为使用最终一致性解决分布式事务,可能出现的问题:
    发送方本地事务执行完成,消息commit, 这时候返回客户是成功,但是消费者可能失败,会导致数据回滚,所以为了避免用户看到下单成功后,因为数据回滚导致订单数据消失,所以需要增加一个中间状态,比如:下单中,只有所有的消费者消费成功后,在改变订单状态为下单成功。

  2. 发送方监听事务消息 RocketMQLocalTransactionListener, 一个工程只可以有一个实现类,否则启动报错。

代码实现

  1. 发送方代码
 public Long placeOrder(Long userId, SeckillOrderCommand seckillOrderCommand) {SeckillGoodsDTO seckillGoods = seckillGoodsDubboService.getSeckillGoods(seckillOrderCommand.getGoodsId(), seckillOrderCommand.getVersion());//检测商品this.checkSeckillGoods(seckillOrderCommand, seckillGoods);boolean exception = false;long txNo = SnowFlakeFactory.getSnowFlakeFromCache().nextId();String key = SeckillConstants.getKey(SeckillConstants.GOODS_ITEM_STOCK_KEY_PREFIX, String.valueOf(seckillOrderCommand.getGoodsId()));try{//获取商品限购信息Object limitObj = distributedCacheService.getObject(SeckillConstants.getKey(SeckillConstants.GOODS_ITEM_LIMIT_KEY_PREFIX, String.valueOf(seckillOrderCommand.getGoodsId())));//如果从Redis获取到的限购信息为null,则说明商品已经下线if (limitObj == null){throw new SeckillException(ErrorCode.GOODS_OFFLINE);}if (Integer.parseInt(String.valueOf(limitObj)) < seckillOrderCommand.getQuantity()){throw new SeckillException(ErrorCode.BEYOND_LIMIT_NUM);}Long result = distributedCacheService.decrementByLua(key, seckillOrderCommand.getQuantity());this.checkResult(result);}catch (Exception e){logger.error("SeckillPlaceOrderLuaService|下单异常|参数:{}|异常信息:{}", JSONObject.toJSONString(seckillOrderCommand), e.getMessage());exception = true;//将内存中的库存增加回去distributedCacheService.incrementByLua(key, seckillOrderCommand.getQuantity());}//事务消息Message<String> message = this.getTxMessage(txNo, userId, SeckillConstants.PLACE_ORDER_TYPE_LUA, exception, seckillOrderCommand, seckillGoods);//发送事务消息rocketMQTemplate.sendMessageInTransaction(SeckillConstants.TOPIC_TX_MSG, message, null);return txNo;}
  1. 发送方本地事务
/*** @author binghe(微信 : hacker_binghe)* @version 1.0.0* @description 监听事务消息* @github https://github.com/binghe001* @copyright 公众号: 冰河技术*/
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener{private final Logger logger = LoggerFactory.getLogger(OrderTxMessageListener.class);@Autowiredprivate SeckillPlaceOrderService seckillPlaceOrderService;@Autowiredprivate DistributedCacheService distributedCacheService;@Override@Transactional(rollbackFor = Exception.class)public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {TxMessage txMessage = this.getTxMessage(message);try{//已经抛出了异常,则直接回滚if (BooleanUtil.isTrue(txMessage.getException())){return RocketMQLocalTransactionState.ROLLBACK;}seckillPlaceOrderService.saveOrderInTransaction(txMessage);
//            int i = 1/0;logger.info("executeLocalTransaction|秒杀订单微服务成功提交本地事务|{}", txMessage.getTxNo());return RocketMQLocalTransactionState.COMMIT;}catch (Exception e){logger.error("executeLocalTransaction|秒杀订单微服务异常回滚事务|{}",txMessage.getTxNo());return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {TxMessage txMessage = this.getTxMessage(message);logger.info("checkLocalTransaction|秒杀订单微服务查询本地事务|{}", txMessage.getTxNo());Boolean submitTransaction = distributedCacheService.hasKey(SeckillConstants.getKey(SeckillConstants.ORDER_TX_KEY, String.valueOf(txMessage.getTxNo())));return BooleanUtil.isTrue(submitTransaction) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.UNKNOWN ;}private TxMessage getTxMessage(Message msg){String messageString = new String((byte[]) msg.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String txStr = jsonObject.getString(SeckillConstants.TX_MSG_KEY);return JSONObject.parseObject(txStr, TxMessage.class);}
}
  1. 消费者监听事务消息
@Component
@RocketMQMessageListener(consumerGroup = SeckillConstants.TX_GOODS_CONSUMER_GROUP, topic = SeckillConstants.TOPIC_TX_MSG)
public class GoodsTxMessageListener implements RocketMQListener<String> {private final Logger logger = LoggerFactory.getLogger(GoodsTxMessageListener.class);@Autowiredprivate SeckillGoodsService seckillGoodsService;@Overridepublic void onMessage(String message) {if (StrUtil.isEmpty(message)){return;}logger.info("秒杀商品微服务开始消费事务消息:{}", message);TxMessage txMessage = this.getTxMessage(message);//如果协调的异常信息字段为false,订单微服务没有抛出异常,则处理库存信息if (BooleanUtil.isFalse(txMessage.getException())){seckillGoodsService.updateAvailableStock(txMessage);}}private TxMessage getTxMessage(String msg){JSONObject jsonObject = JSONObject.parseObject(msg);String txStr = jsonObject.getString(SeckillConstants.TX_MSG_KEY);return JSONObject.parseObject(txStr, TxMessage.class);}
}
  1. 消费者处理本地事务
@Override@Transactional(rollbackFor = Exception.class)public boolean updateAvailableStock(TxMessage txMessage) {Boolean decrementStock = distributedCacheService.hasKey(SeckillConstants.getKey(SeckillConstants.GOODS_TX_KEY, String.valueOf(txMessage.getTxNo())));if (BooleanUtil.isTrue(decrementStock)){logger.info("updateAvailableStock|秒杀商品微服务已经扣减过库存|{}", txMessage.getTxNo());return true;}boolean isUpdate = false;try{isUpdate = seckillGoodsDomainService.updateAvailableStock(txMessage.getQuantity(), txMessage.getGoodsId());//成功扣减库存成功if (isUpdate){distributedCacheService.put(SeckillConstants.getKey(SeckillConstants.GOODS_TX_KEY, String.valueOf(txMessage.getTxNo())), txMessage.getTxNo(), SeckillConstants.TX_LOG_EXPIRE_DAY, TimeUnit.DAYS);}else{//发送失败消息给订单微服务rocketMQTemplate.send(SeckillConstants.TOPIC_ERROR_MSG, getErrorMessage(txMessage));}
//            int i = 1/0;}catch (Exception e){isUpdate = false;logger.error("updateAvailableStock|抛出异常|{}|{}",txMessage.getTxNo(), e.getMessage());//发送失败消息给订单微服务rocketMQTemplate.send(SeckillConstants.TOPIC_ERROR_MSG, getErrorMessage(txMessage));}return isUpdate;}

注意:在消费者执行本地事务时,如果发生异常,则执行逆向流程

  1. 发送者监听逆向流程消息
@Component
@RocketMQMessageListener(consumerGroup = SeckillConstants.TX_ORDER_CPNSUMER_GROUP, topic = SeckillConstants.TOPIC_ERROR_MSG)
public class OrderErrorMessageListener implements RocketMQListener<String> {private final Logger logger = LoggerFactory.getLogger(OrderErrorMessageListener.class);@Autowiredprivate SeckillOrderService seckillOrderService;@Overridepublic void onMessage(String message) {logger.info("onMessage|秒杀订单微服务开始消费消息:{}", message);if (StrUtil.isEmpty(message)){return;}//删除数据库中对应的订单seckillOrderService.deleteOrder(this.getErrorMessage(message));}private ErrorMessage getErrorMessage(String msg){JSONObject jsonObject = JSONObject.parseObject(msg);String txStr = jsonObject.getString(SeckillConstants.ERROR_MSG_KEY);return JSONObject.parseObject(txStr, ErrorMessage.class);}
}

注意:该代码仅提供执行流程参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2660615.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

西北大学844计算机类考研-25级初试高分总攻略

西北大学844计算机类考研-25级初试高分攻略 个人介绍 ​ 本人是西北大学22级软件工程研究生&#xff0c;考研专业课129分&#xff0c;过去一年里在各大辅导机构任职&#xff0c;辅导考研学生专业课844&#xff0c;辅导总时长达400小时&#xff0c;辅导学生超过20余人&#xf…

展现无限创意的Photoshop 2023 Mac/win中文版:打造您的独特艺术之旅

无论您是摄影师、设计师还是艺术家&#xff0c;Photoshop 2023&#xff08;ps 2023&#xff09;都是您不可或缺的创意工具。最新升级的Photoshop 2023带来了更多令人兴奋的功能和改进&#xff0c;让您能够以前所未有的方式展现无限创意。 首先&#xff0c;Photoshop 2023拥有强…

uni-app引入vant表单(附源码)

新建项目 下载安装vant npm i vant main.js引入 import { Form } from vant; import { Field } from vant;Vue.use(Form); Vue.use(Field);代码引入 <van-form submit"onSubmit"><van-fieldclass"rePwd"v-model"username"name"请…

SpringBoot 接口对数据枚举类型的入参以及出参转换处理

目录 1、在项目中使用枚举类型2、不做任何处理的演示效果2.1、接口出参2.2、接口入参 3、用枚举的code作为参数和返回值3.1 代码案例3.1.1、定义枚举基础接口BaseEnum&#xff0c;每个枚举都实现该接口3.1.2、性别Sex枚举并实现接口BaseEnum3.1.3、定义BaseEnum枚举接口序列化3…

Python+OpenCV 零基础学习笔记(4-5):计算机图形基础+Python相对文件路径+OpenCV图像+OpenCV视频

文章目录 相关链接运行环境前言计算机图形OpenCV简单使用图形读取文件读取可能会出现的问题&#xff1a;路径不对解决方案其它路径问题解决方案 图像显示保存OpenCV视频视频素材如何获取&#xff1f;简单视频读取 相关链接 【2022B站最好的OpenCV课程推荐】OpenCV从入门到实战 …

Spring高手之路-Spring事务的传播机制(行为、特性)

目录 含义 七种事务传播机制 1.REQUIRED&#xff08;默认&#xff09; 2.REQUIRES_NEW 3.SUPPORTS 4.NOT_SUPPORTED 5.MANDATORY 6.NEVER 7.NESTED 含义 事务的传播特性指的是当一个事务方法被另一个事务方法调用时&#xff0c;这个事务方法应该如何进行&#xff1f; 七…

HTTP限流控制:Go语言中的精细把关

开场白&#xff1a;在Web应用中&#xff0c;流量控制是一个关键的防护措施&#xff0c;用于防止资源过度消耗和潜在的安全威胁。特别是在面对DDoS攻击或异常请求时&#xff0c;限流显得尤为重要。今天&#xff0c;我们将探讨如何在Go语言中实现HTTP的限流控制。 知识点一&…

elasticsearch-hadoop.jar 6.8版本编译异常

## 背景 重新编译 elasticsearch-hadoop 包&#xff1b; GitHub - elastic/elasticsearch-hadoop at 6.8 编译 7.17 版本时很正常&#xff0c;注意设置下环境变量就好&#xff0c;JAVA8_HOME/.... 编译 6.8 版本时&#xff08;要求jdk8 / jdk9&#xff09;&#xff0c;出现…

使用 Django 的异步特性提升 I/O 类操作的性能

目录 一、引言 二、Django 的异步特性 三、提升 I/O 类操作的性能 四、示例代码 五、总结 一、引言 Django 是一个高级的 Python Web 框架&#xff0c;它以快速开发和简洁的代码而闻名。然而&#xff0c;对于一些 I/O 密集型的应用程序&#xff0c;Django 的同步特性可能…

kubeadm创建k8s集群

kubeadm来快速的搭建一个k8s集群&#xff1a; 二进制搭建适合大集群&#xff0c;50台以上。 kubeadm更适合中下企业的业务集群。 部署框架 master192.168.10.10dockerkubelet kubeadm kubectl flannelnode1192.168.10.20dockerkubelet kubeadm kubectl flannelnode2192.168.1…

Linux 查看应用cpu使用情况

1、top 命令可查看当前系统所有应用cpu使用情况 2、top -H -p pid 可查看应用下线程cpu使用情况

Docker安装Grafana

1. 介绍 Grafana 是一个开源的度量分析和可视化工具&#xff0c;可以通过将采集的数据分析、查询&#xff0c;然后进行可视化的展示&#xff0c;并能实现报警。参考官网地址&#xff1a;Run Grafana Docker image | Grafana documentation 2. 安装Grafana (1) . 下载 命令&…

第二章 Eureka服务注册与发现

Eureka服务注册与发现 gitee&#xff1a;springcloud_study: springcloud&#xff1a;服务集群、注册中心、配置中心&#xff08;热更新&#xff09;、服务网关&#xff08;校验、路由、负载均衡&#xff09;、分布式缓存、分布式搜索、消息队列&#xff08;异步通信&#xff…

unity 编辑器的日志打印界面详解(有些不常见的问题)

提示&#xff1a;文章有错误的地方&#xff0c;还望诸位大神不吝指教&#xff01; 文章目录 前言一、Console界面1.Console窗口没有显示2.Clear3.Collapse4.Clear on Play5.Clear on Build6.Error Pause7.Editor1.Player Logging2.Editor3.<Enter IP> 二 搜索和过滤控制台…

实战 | 使用OpenCV快速去除文档中的表格线条(步骤 + 源码)

导 读 本文主要介绍如何使用OpenCV快速去除文档中的表格线条,并给详细步骤和代码。 背景介绍 测试图如下,目标是去除下面三张图中的表格线条,方便后续图像处理。 实现步骤 下面演示详细步骤,以图1为例: 【1】获取二值图像:加载图像、转为灰度图、OTSU二值化 i…

k8s的二进制部署(二)网络

节点部署完成之后,节点的状态都是Notready&#xff0c;所以要部署k8s网络&#xff1a; k8s的网络类型&#xff1a; k8s中的通信模式&#xff1a; pod内部之间容器与容器之间的通信。 在同一个pod中的容器共享资源和网络&#xff0c;使用同一个网络命名空间&#xff0c;可以直…

Rust安装(Windows)

安装Rust 进入Rust官网&#xff0c;下载Rustup&#xff08;Rust安装器和版本管理工具&#xff09; 下载rustup-init.exe后双击运行&#xff0c;进入以下界面&#xff1a; 1&#xff09;通过 visual studio community 安装程序快速安装 2&#xff09;手动安装必备组件 3&#x…

stm32H743编译器关于浮点类型强制转换传参的bug

局部函数&#xff0c;正常传参 当测试函数作为局部函数和main函数写在同一个文件中时&#xff0c;参数可以正常传递。函数参数和形参都为3.14 float value 0.0; void float_test(float _v) {value _v; }int main(void) {float_test(3.14f);while(1); } keil仿真截图&#…

从外网访问内网服务器:安装到使用一站通

如果你所在的是一个小的实验室&#xff0c;可能并没有大型的服务器集群而是仅是配备了小型服务器&#xff0c;日常工作便是在在局域网内访问服务器进行各项数据处理。因为在外网无法访问内网服务器&#xff0c;极大的限制了我们偶尔在外想监测一下数据的欲望。本文介绍了一种简…

实时交通标志检测和分类(代码)

交通标志检测和分类技术是一种基于计算机视觉和深度学习的先进技术&#xff0c;能够识别道路上的各种交通标志&#xff0c;并对其进行分类和识别。这项技术在智能交通系统、自动驾驶汽车和交通安全管理领域具有重要的应用前景。下面我将结合实时交通标志检测和分类的重要性、技…