电商系统秒杀一 秒杀的各种解决方案以及存在的问题

一 业务场景介绍

1.1 正常电商流程

在这里插入图片描述

1.2 活动和场次关系

秒杀活动表:sms_flash_promotion

DROP TABLE IF EXISTS `sms_flash_promotion`;
CREATE TABLE `sms_flash_promotion`  (`id` bigint(20) NOT NULL AUTO_INCREMENT,`title` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',`start_date` date NULL DEFAULT NULL COMMENT '开始日期',`end_date` date NULL DEFAULT NULL COMMENT '结束日期',`status` int(1) NULL DEFAULT NULL COMMENT '上下线状态,1上线、0下线',`create_time` datetime(0) NULL DEFAULT NULL COMMENT '秒杀时间段名称',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 8 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '限时购表' ROW_FORMAT = DYNAMIC;

秒杀场次表:sms_flash_promotion_session

DROP TABLE IF EXISTS `sms_flash_promotion_session`;
CREATE TABLE `sms_flash_promotion_session`  (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',`name` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '场次名称',`start_time` time(0) NULL DEFAULT NULL COMMENT '每日开始时间',`end_time` time(0) NULL DEFAULT NULL COMMENT '每日结束时间',`status` int(1) NULL DEFAULT NULL COMMENT '启用状态:0->不启用;1->启用',`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 8 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '限时购场次表' ROW_FORMAT = DYNAMIC;

场次商品关系表:sms_flash_promotion_product_relation

DROP TABLE IF EXISTS `sms_flash_promotion_product_relation`;
CREATE TABLE `sms_flash_promotion_product_relation`  (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',`flash_promotion_id` bigint(20) NULL DEFAULT NULL COMMENT '秒杀活动ID->关联sms_flash_promotion表',`flash_promotion_session_id` bigint(20) NULL DEFAULT NULL COMMENT '当前日期活动场次编号',`product_id` bigint(20) NULL DEFAULT NULL COMMENT '产品ID',`flash_promotion_price` decimal(10, 2) NULL DEFAULT NULL COMMENT '限时购价格',`flash_promotion_count` int(11) NULL DEFAULT NULL COMMENT '限时购数量',`flash_promotion_limit` int(11) NULL DEFAULT NULL COMMENT '每人限购数量',`sort` int(11) NULL DEFAULT NULL COMMENT '排序',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 44 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '商品限时购与商品关系表' ROW_FORMAT = DYNAMIC;

一个活动可以有多个场次,每个场次可以有多个商品进行秒-杀。

二 秒杀系统设计

分两部分内容;秒杀业务设计和秒杀技术实现。

2.1 秒杀业务

秒杀业务的特性:
在这里插入图片描述
秒杀业务设计:
在这里插入图片描述
在这里插入图片描述
营销工具:系统整理的促销工具,可以对某些特定的工具详细解释。
营销活动:从营销工具中提出创建一个活动。
营销活动订单:针对营销活动产生的订单

商品级优惠:
限时促销(商品级)、
限时抢购(商品级)
秒杀(商品级)
商品包邮(商品级)

订单级优惠:
满就赠(订单级)
满立减(订单级)
送优惠券(订单级)
折扣(订单级)
Vip折扣(订单级)
订单包邮(订单级)

全站促销:
优惠券
优化券补发
银行促销
支付红包
团购预售
微信砍价

商品限时秒杀(商品级别)
是一款用于常规的营销活动,在限时促销上增加“排除参与活动”、“限制用户购买次数”、“限购种类”、“未付款取消时间”、“活动商品限制库存”等功能,是限时促销促销的增强版,常用于用户拉新、日常的秒杀、日常活动。促销渠道(app,pc,wap,global_app,fresh_app)等

订单满额减(订单级别)
常用促销工具,有满X元减Y元、满X件减Y元,支持叠加满减,订单商品满减金额,支持限制用户参与次数,可设置包括享受优惠的商品分类,商品品牌,商品、促销会员等级,会员标签,促销渠道(app,pc,wap,global_app,fresh_app),订单可享受满减的支付门槛金额等,如购买全场商品,订单满100元优惠20元

银行促销(全站)
常用促销工具,与银行合作在一段时间内每周固定几天进行优惠,可设置用户总参与次数,每天总活动次数,在用户进行支付时进行减免。当前只有光大银行每周二、周六有活动,参与渠道只有pc、h5端,支持排除部分商品,通常是虚拟商品

2.1 秒杀技术

秒杀技术特性:
在这里插入图片描述
单一职责:
秒杀流量是占比比较重的一环,所以要独立部署,与其他业务分开,互不影响。扩容容易。
防止超卖:
100个库存,1000个人购买,如何保证其中100个人能买到
限流、熔断、降级:
主要是防止程序蹦掉。核心就是限制次数、限制总量、快速失败、降级运行
队列削峰:
12306中选择购票时,选择自己靠窗座位时,所有下单请求,加入队列,满满匹配撮合。
流量错峰、防刷:
使用各种手段、将流量分担到更大宽度的时间点、比如验证码、F码
预热、快速扣减:
秒杀读多写少(访问商品人数往往大于购买人数)。活动和库存都可以提前预热。比如把
数据放到redis中。
动静分离:
nginx做好动静分离、使用CDN网络、分担后端的相应压力。

三 秒杀实战

核心问题: 一个是并发读,一个是并发写;
数据库(1.2章节里已经建好表了):
秒杀场次表:sms_flash_promotion_session
秒杀活动表:sms_flash_promotion
场次商品关系表:sms_flash_promotion_product_relation

3.1 下单流程

在这里插入图片描述
下单秒杀确认接口:

 @RequestMapping(value = "/miaosha/generateConfirmOrder",method = RequestMethod.POST)@ResponseBodypublic CommonResult generateMiaoShaConfirmOrder(@RequestParam("productId") Long productId,String token,@RequestHeader("memberId") Long memberId) throws BusinessException {return secKillOrderService.generateConfirmMiaoShaOrder(productId,memberId,token);}

3.2 确认下单流程

一、检查方法:confirmCheck
1、检查本地缓存售罄状态
2、校验是否有权限购买token
3、判断redis库存是否充足
4、 检查是否正在排队当中
二、调用会员服务获取会员信息
fegin远程调用
三、产品服务获取产品信息
四、验证秒杀时间是否超时
五、获取用户收获列表
六、构建商品信息
七、计算金额
八、会员积分

下单方式:0->同步下单。1->异步下单排队中。-1->秒杀失败。>1->秒杀成功(返回订单号)

流程:
1、检查方法:confirmCheck
2、 从产品服务获取产品信息
3、 验证秒杀时间是否超时
4、调用会员服务获取会员信息
5、通过Feign远程调用 会员地址服务
6、预减库存 ####(异步流程才需要这块,数据库锁不需要 此操作)
7、生成下单商品信息
8、库存处理 ####

confirmCheck方法如下

private CommonResult confirmCheck(Long productId, Long memberId, String token) throws BusinessException {/*1、设置标记,如果售罄了在本地cache中设置为true*/Boolean localcache = cache.getCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);if (localcache != null && localcache) {return CommonResult.failed("商品已经售罄,请购买其它商品!");}/**2、 校验是否有权限购买token TODO 楼兰*//*  String redisToken = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_TOKEN_PREFIX + memberId + ":" + productId);if(StringUtils.isEmpty(redisToken) || !redisToken.equals(token)){return CommonResult.failed("非法请求,token无效!");}*///3、从redis缓存当中取出当前要购买的商品库存,RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX:miaosha:stock:cache:Integer stock = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, Integer.class);if (stock == null || stock <= 0) {/*设置标记,如果售罄了在本地cache中设置为true*/cache.setLocalCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, true);return CommonResult.failed("商品已经售罄,请购买其它商品!");}String async = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId + ":" + productId);if (async != null && async.equals("1")) {Map<String, Object> result = new HashMap<>();result.put("orderStatus", "1");//下单方式0->同步下单,1->异步下单排队中,-1->秒杀失败,>1->秒杀成功(返回订单号)return CommonResult.failed(result, "异步下单排队中");}return CommonResult.success(null);}

秒杀流程核心点为:
1、价格计算 2、库存处理

商品级别优惠计算:
在这里插入图片描述
订单级别计算优惠:
在这里插入图片描述

3.3 库存问题——超卖

高并发下会出现超卖问题。问题如下
在这里插入图片描述
线程一查询库存100个,然后进行扣减库存。
线程二查询库存也是100个,然后也进行扣减库存、
实际情况是:两个线程都扣减了库存,买了两件商品,但是库存只扣了一次,订单有两笔订单,但是库存只扣了一个。这就是库存超卖问题。

何时扣减库存:
1、下单时扣减
2、支付时扣减

3.4 库存解决

如何解决库存超卖问题,是我们秒杀非常重要的一个问题。
我们接下来会学习到用数据库的锁、用redis的特性、异步下单等解决方案来解决。

悲观锁操作:

begin;
select flash_promotion_count from sms_flash_promotion_product_relation where id=43 for UPDATE;
update sms_flash_promotion_product_relation set flash_promotion_count=flash_promotion_count-1 where id=43;
# ROLLBACK;
commit

for UPDATE:行级锁,使用不当的话会导致表级锁;sql语句不走索引的话就会把整张表锁住;

select…for update是MySQL提供的实现悲观锁的方式。此时在秒杀表中,id为43的那条数据就被我们锁定了,其它的要执行select * from 秒杀表 where id=43 for update的事务必须等本次事务提交之后才能执行。这样我们可以保证当前的数据不会被其它事务修改。

MySQL还有个问题是select…for update语句执行中所有扫描过的行都会被锁上,因此在MySQL中用悲观锁务必须确定走了索引,而不是全表扫描,否则将会将整个数据表锁住。

for update 悲观锁 行锁还有条件:就是要能查询到记录、并且走了索引才是行锁。某些情况可能是锁整张表。

因此悲观锁并不是适用于任何场景,它也存在一些不足,因为悲观锁大多数情况下依靠数据库的锁机制实现,以保证操作最大程度的独占性。如果加锁的时间过长,其他用户长时间无法访问,影响了程序的并发访问性,同时这样对数据库性能开销影响也很大,特别是对长事务而言,这样的开销往往无法承受,这时就需要乐观锁。

乐观锁操作:
乐观锁相对悲观锁而言,它认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回错误信息,让用户决定如何去做。

版本号的实现方式有两种,一个是数据版本机制,一个是时间戳机制。
在这里插入图片描述

begin;
select flash_promotion_count from sms_flash_promotion_product_relation where id=43 ;
update sms_flash_promotion_product_relation set flash_promotion_count=flash_promotion_count ,version=version+1 where id=43 and version=#version#;
# ROLLBACK;
Commit

这样除了select查询库存,还需要更新库存,其实还有插入insert order orderlog orderdetail等需要插入数据库。库存更新没问题,但是插入订单时失败了是不是要回滚,如果不在一个事务就会出错。如果在一个事务,那又涉及到事务过长甚至可能是跨库然后无法用本地事务来解决。

上边的解决方式存在的问题汇总
有三个问题、性能问题、个数问题、架构问题。

1、性能问题:
无论是悲观锁还是乐观锁对需要对数据库进行上锁,而我们数据库的资源是非常有限的。

2、个数问题:

<!--扣减库存 防止库存超卖-->
<update id="descStock">UPDATE sms_flash_promotion_product_relationSET flash_promotion_count = CASEWHEN flash_promotion_count>=#{stock} THENflash_promotion_count - #{stock}ELSEflash_promotion_countENDWHEREid = #{id}
</update>

如果库存数量只有1个了,但是现在小明下单这时要买两个,那这条sql语句就有问题了,我们库存只有一个,很明显不够卖了吧。所以这里要判断下,库存数大于购买数才能购买

3、架构问题
1000个人来抢就意味着有1000个人来请求数据库尝试扣减库存。
假设我数据库只有10减商品,意味着990个请求是没有意义的。
那这样说的话这种架构有优化的空间吧;

3.5 Redis2.0版本解决库存问题

刚才我们看了用数据库的话性能相对来说是有很大瓶颈的,瓶颈在哪儿了?我们先抛开超卖的问题,我们回到整个业务的本质来说,秒杀的场景一般都是商品比较实惠的,而大众都有贪图便宜的这个心态,那商家为了吸引顾客会以比较少的商品来吸引比较多的顾客,就是顾客多商品少,那就意味着大部分人是买不到商品的,就好比库存只有10个,但是现在有100个人购买或者1000个人准备下单购买。但是里面只有10个人才能买到。这大量的请求数据库是受不了的。

正常下单
在这里插入图片描述
预下单:
根据这种情况我们可以把库存放到redis里面,秒杀下单时,先从redis里面获取库存数量,然后根据库存数量判断是否可以进行下一步,如果有库存就直接下单,如果没有库存就不能下单。这样做的好处是什么? 可以拦截大部分流量进入到数据库中,刚才我们说过了上述的业务场景问题,简称就是狼多肉少吧,这一步我们也叫下单流程中的预下单

//3、从redis缓存当中取出当前要购买的商品库存
Integer stock = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, Integer.class);
if (stock == null || stock <= 0) {return CommonResult.failed("商品已经售罄,请购买其它商品!");
}

预售库存:
我们现在库存不从数据库里面扣减,而是从redis里面获取,那请问我们redis扣减库存这个数量从哪儿来的?

可以开一个定时任务,在开卖前几分钟或者几小时,把mysql里的数据同步到redis里;
在这里插入图片描述

//3、从redis缓存当中取出当前要购买的商品库存
Integer stock = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, Integer.class);
if (stock == null || stock <= 0) {/*设置标记,如果售罄了在本地cache中设置为true*/cache.setLocalCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, true);return CommonResult.failed("商品已经售罄,请购买其它商品!");
}
/** 订单下单前的购买与检查*/
private CommonResult confirmCheck(Long productId, Long memberId, String token) throws BusinessException {/*1、设置标记,如果售罄了在本地cache中设置为true*/Boolean localcache = cache.getCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);if (localcache != null && localcache) {return CommonResult.failed("商品已经售罄,请购买其它商品!");}//3、从redis缓存当中取出当前要购买的商品库存Integer stock = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, Integer.class);if (stock == null || stock <= 0) {/*设置标记,如果售罄了在本地cache中设置为true*/cache.setLocalCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, true);return CommonResult.failed("商品已经售罄,请购买其它商品!");}return CommonResult.success(null);}

存在的问题:
我们可以发现本地的缓存级别是jvm级别的,而各自的jvm售罄状态是不一样的,每个jvm只能修改自己本身的售罄状态,但是不能影响别的jvm状态。

3.6 解决方案

在这里插入图片描述

上边的这种情况存在问题,项目可能是集群部署,那就会导致各自的JVM获取到的售罄状态不一样,有如下几种解决方案

1、方案一 zookeeper
可以用zookeeper的watch机制来实现,让毎个jvm都监听zk的某个就节点,一旦数据有改变之后通知到其他节点上
在这里插入图片描述
原理:
在这里插入图片描述
一个jvm发现售罄时,就发消息到zk,此时监听zk的其他jvm就能感知到这个售罄的消息,这样就可以解决多个jvm售罄状态不同步的问题;

zk存在的问题:半数以上的机制,会导致延迟;
有点:高可用(也是因为半数以上机制)

2、方案二 redis
利用redis的channel机制实现(类似于消息中间件mq);

一个客户端订阅主题
在这里插入图片描述
订阅主题的命令:subscribe monkey

一个客户端向订阅的主题(channel)发送消息:
在这里插入图片描述
向主题发消息命令:publish monkey hello

此时其他订阅这个主题的客户端都能收到这个消息;

//通知服务群,清除本地售罄标记缓存if (shouldPublishCleanMsg(productId)) {redisOpsUtil.publish("cleanNoStockCache", productId);}

监听类:
监听到卖完后,就删除缓存;其实最好是改变状态(改变值),而不是清除,清除缓存的话,就会有歧义——是卖完了还是没有卖?


import com.tuling.tulingmall.common.constant.RedisKeyPrefixConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.lang.Nullable;import java.nio.charset.StandardCharsets;@Slf4j
public class RedisChannelListener implements MessageListener {@Autowiredprivate LocalCache localCache;@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {log.info("sub message :) channel[cleanNoStockCache] !");String productId = new String(message.getBody(), StandardCharsets.UTF_8);localCache.remove(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);}
}

redis这种发布与订阅是没有ack的,发出去了不会管有没有收到,这是它的不足(可靠性弱);那优点是什么?优点就是其缺点,吞吐量相当来说就会提高,因为减少了通讯(磁盘IO),那处理数据的能力就就会上升;

3、方案三 mq等其他方式
利用消息队列broker也能解决这个问题,缺点是数据要刷到磁盘,性能较低,这里不细述了;

3.7 秒杀商品的预热【product】

在项目启动的时候就把秒杀商品的库存放到redis中

import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tuling.tulingmall.component.RedisChannelListener;
import com.tuling.tulingmall.util.RedisOpsUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Slf4j
@Configuration
public class RedisConifg {@Autowiredprivate RedisConnectionFactory connectionFactory;@Bean@Primarypublic RedisTemplate<String,Object> redisTemplate(){RedisTemplate<String,Object> template = new RedisTemplate();template.setConnectionFactory(connectionFactory);// 序列化工具Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();template.setKeySerializer(stringRedisSerializer);template.setValueSerializer(jackson2JsonRedisSerializer);template.setHashKeySerializer(jackson2JsonRedisSerializer);template.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}@Beanpublic RedisOpsUtil redisOpsUtil(){return new RedisOpsUtil();}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(){RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(messageListenerAdapter(),channelTopic());return container;}@BeanMessageListenerAdapter messageListenerAdapter(){return new MessageListenerAdapter(redisChannelListener());}@BeanRedisChannelListener redisChannelListener(){return new RedisChannelListener();}@BeanChannelTopic channelTopic(){return new ChannelTopic("cleanNoStockCache");}
}

四 异步下单

之前redis方案不足: 生产环境中,数据库需要insert 很多表,所以数据库需要优化,优化思路如下
1、颠覆性 :mysql 换为oracle
2、改进型:mysql添加索引、做分库分表、读写分离等;

以上两种思路,这里暂时不讲,这里使用消息中间件来继续优化;上边已经说过了,下单时直接插入数据库是有问题的(需要insert很多表,有性能问题,而且瞬时流量太大),那么此时可以采用MQ中间件来解决,即下单时先把数据放在MQ里,再通过MQ异步插入数据库;

消息中间件三大特性:异步、解耦、削峰填谷;
在这里插入图片描述
redis里的值(下单方式):
0:同步下单
1:异步下单排队中
-1:秒杀失败
大于1:返回订单号,跳转支付页

异步下单

/*** 秒杀订单下单** @param orderParam* @param memberId* @return*/@Override//@Transactional TODO 如果是数据库控制 防止超卖。Transactional有意义吗?public CommonResult<Map<String, Object>> generateSecKillOrder(OrderParam orderParam, Long memberId, String token) throws BusinessException {Long productId = orderParam.getItemIds().get(0);CommonResult commonResult = confirmCheck(productId, memberId, token);if (commonResult.getCode() == 500) {return commonResult;}//【2】 从产品服务获取产品信息PmsProductParam product = getProductInfo(productId);//【3】 验证秒杀时间是否超时if (!volidateMiaoShaTime(product)) {return CommonResult.failed("秒杀活动未开始或已结束!");}//【4】 调用会员服务获取会员信息UmsMember member = umsMemberFeignApi.getMemberById().getData();//【5】 通过Feign远程调用 会员地址服务UmsMemberReceiveAddress address = umsMemberFeignApi.getItem(orderParam.getMemberReceiveAddressId()).getData();//预减库存if (!preDecrRedisStock(productId, product.getFlashPromotionRelationId())) {return CommonResult.failed("下单失败,已经抢购完了");}//准备创建订单//生成下单商品信息OmsOrderItem orderItem = new OmsOrderItem();orderItem.setProductId(product.getId());orderItem.setProductName(product.getName());orderItem.setProductPic(product.getPic());orderItem.setProductBrand(product.getBrandName());orderItem.setProductSn(product.getProductSn());orderItem.setProductPrice(product.getFlashPromotionPrice());orderItem.setProductQuantity(1);orderItem.setProductCategoryId(product.getProductCategoryId());orderItem.setPromotionAmount(product.getPrice().subtract(product.getFlashPromotionPrice()));orderItem.setPromotionName("秒杀特惠活动");orderItem.setGiftIntegration(product.getGiftPoint());orderItem.setGiftGrowth(product.getGiftGrowth());orderItem.setCouponAmount(new BigDecimal(0));orderItem.setIntegrationAmount(new BigDecimal(0));orderItem.setPromotionAmount(new BigDecimal(0));//支付金额BigDecimal payAmount = product.getFlashPromotionPrice().multiply(new BigDecimal(1));//优惠价格orderItem.setRealAmount(payAmount);OmsOrder order = new OmsOrder();order.setDiscountAmount(product.getPrice().subtract(product.getFlashPromotionPrice()));//折扣金额order.setFreightAmount(new BigDecimal(0));//运费金额order.setPromotionAmount(new BigDecimal(0));order.setPromotionInfo("秒杀特惠活动");order.setTotalAmount(payAmount);order.setIntegration(0);order.setIntegrationAmount(new BigDecimal(0));order.setPayAmount(payAmount);order.setMemberId(memberId);order.setMemberUsername(member.getUsername());order.setCreateTime(new Date());//设置支付方式:0->未支付,1->支付宝,2->微信order.setPayType(orderParam.getPayType());//设置支付方式:0->PC订单,1->APP订单,2->小程序order.setSourceType(0);//订单状态:0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单order.setStatus(0);//订单类型:0->正常订单;1->秒杀订单order.setOrderType(1);//用户收货信息order.setReceiverName(address.getName());order.setReceiverPhone(address.getPhoneNumber());order.setReceiverPostCode(address.getPostCode());order.setReceiverProvince(address.getProvince());order.setReceiverCity(address.getCity());order.setReceiverRegion(address.getRegion());order.setReceiverDetailAddress(address.getDetailAddress());//0->未确认;1->已确认order.setConfirmStatus(0);order.setDeleteStatus(0);//计算赠送积分order.setIntegration(product.getGiftPoint());//计算赠送成长值order.setGrowth(product.getGiftGrowth());//生成订单号-理论上唯一// order.setOrderSn(generateOrderSn(order));/*----------------------------------基本方案(下单时直接插入数据库的方案,已注释掉,采用后边的消息中间件的方式)---------------------------------------*//*try {//【悲观锁】Integer dbStock = miaoShaStockDao.selectMiaoShaStockInLock(product.getFlashPromotionRelationId());if(dbStock <= 0){return CommonResult.failed("商品已抢完!");}miaoShaStockDao.descStockInLock(product.getFlashPromotionRelationId(),dbStock-1);//【乐观锁】减库存,DB乐观锁减库存实现Integer dbStock = miaoShaStockDao.selectMiaoShaStock(product.getFlashPromotionRelationId());if(dbStock <= 0){return CommonResult.failed("商品已抢完!");}Integer id = miaoShaStockDao.descStockInVersion(product.getFlashPromotionRelationId(),dbStock,dbStock-1);if(id <= 0){return CommonResult.failed("没抢到!再接再厉!");}int resultDb = miaoShaStockDao.descStock(product.getFlashPromotionRelationId(),1);if(resultDb > 0 ){//插入订单记录orderMapper.insertSelective(order);//OrderItem关联orderItem.setOrderId(order.getId());orderItem.setOrderSn(order.getOrderSn());//插入orderItemorderItemMapper.insertSelective(orderItem);}else{return CommonResult.failed();}} catch (Exception e) {log.error("create order failure:)",e.getMessage(),e.getCause());//补回已经减掉的库存!incrRedisStock(productId);//通知服务群,清除本地售罄标记缓存redisOpsUtil.publish("cleanNoStockCache",productId);throw new BusinessException("创建订单失败!");}List<OmsOrderItem> itemList = new ArrayList<>();itemList.add(orderItem);Map<String,Object> result = new HashMap<>();result.put("order",order);result.put("orderItem",itemList);0//下单方式0->同步下单,1->异步下单排队中,-1->秒杀失败result.put("orderStatus","0");*//*******************************异步下单******************************************/OrderMessage orderMessage = new OrderMessage();orderMessage.setOrder(order);orderMessage.setOrderItem(orderItem);orderMessage.setFlashPromotionRelationId(product.getFlashPromotionRelationId());orderMessage.setFlashPromotionLimit(product.getFlashPromotionLimit());orderMessage.setFlashPromotionEndDate(product.getFlashPromotionEndDate());Map<String, Object> result = new HashMap<>();List<OmsOrderItem> itemList = new ArrayList<>();itemList.add(orderItem);result.put("order", order);result.put("orderItemList", itemList);try {//发送消息到MQboolean sendStatus = orderMessageSender.sendCreateOrderMsg(orderMessage);if (sendStatus) {/** 打上排队的标记,1:排队中*/redisOpsUtil.set(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId + ":" + productId, Integer.toString(1), 60, TimeUnit.SECONDS);/** 下单方式0->同步下单,1->异步下单排队中,-1->秒杀失败*/result.put("orderStatus", 1);} else {/** 还原预减库存*/incrRedisStock(productId);/** 清除掉本地guavacache已经售完的标记*/cache.remove(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);//通知服务群,清除本地售罄标记缓存if (shouldPublishCleanMsg(productId)) {redisOpsUtil.publish("cleanNoStockCache", productId);}result.put("orderStatus", -1);return CommonResult.failed(result, "下单失败");}} catch (Exception e) {log.error("消息发送失败:error msg:{}", e.getMessage(), e.getCause());/** 还原预减库存*/incrRedisStock(productId);/** 清除掉本地guavacache已经售完的标记*/cache.remove(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);//通知服务群,清除本地售罄标记缓存if (shouldPublishCleanMsg(productId)) {redisOpsUtil.publish("cleanNoStockCache", productId);}result.put("orderStatus", -1);return CommonResult.failed(result, "下单失败");}return CommonResult.success(result, "下单中.....");}

如果是异步下单的话,则需要定时任务来查询订单状态(前端发起的请求):

@ApiOperation("根据购物车信息生成订单")
@GetMapping("/miaosha/result")
@ResponseBody
public CommonResult miaoShaResult(@RequestParam("productId") Long productId,@RequestHeader("memberId") Long memberId){String status = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId+ ":" + productId);if(ObjectUtils.isEmpty(status)){return CommonResult.success(null,"无正在秒杀中的订单!");}if(status.equals("-1")){return CommonResult.success(status,"秒杀失败!");}if(status.equals("1")){return CommonResult.success(status,"正在排队中,请耐心等待!");}//如果Status>1,则秒杀成功,返回订单编号return CommonResult.success(status);
}

4.1 项目里涉及到的消息队列里的topic

4.1.1 处理未支付的订单(延迟消息来处理,20分钟,第五章节里还会细讲)

利用rocketmq延迟消息的一个特性来解决“定时任务”来取消订单操作。即利用MQ的延迟消息功能,消息20分钟没有被消费(没支付)的话,就把消息回滚到broker里;

Topic名字:order-status-check

生产端:

public boolean sendTimeOutOrderMessage(String cancelId){Message message = MessageBuilder.withPayload(cancelId).setHeader(RocketMQHeaders.KEYS, cancelId).build();SendResult result = rocketMQTemplate.syncSend(scheduleTopic+":"+TAG,message,5000,15);return SendStatus.SEND_OK == result.getSendStatus();
}

消费端:


import com.tuling.tulingmall.service.OmsPortalOrderService;
import com.tuling.tulingmall.service.SecKillOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 消费监听rocketmq-订单超时消息* @author yangguo*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.tulingmall.cancelGroup}", topic = "${rocketmq.tulingmall.scheduleTopic}")
public class RocketMqCancelOrderReciever implements RocketMQListener<String> {@Autowiredprivate OmsPortalOrderService omsPortalOrderService;@Autowiredprivate SecKillOrderService secKillOrderService;/*** 延时消息,取消超时订单* @param cancelId*/@Overridepublic void onMessage(String cancelId) {if(StringUtils.isEmpty(cancelId)){return;}Long orderId = Long.parseLong(cancelId.split(":")[0]);Long promotionId = Long.parseLong(cancelId.split(":")[1]);Long productId = Long.parseLong(cancelId.split(":")[2]);try {//取消的订单,释放DB库存omsPortalOrderService.cancelOrder(orderId,promotionId);//取消的订单-还原缓存库存secKillOrderService.incrRedisStock(productId);} catch (Exception e) {log.error("订单取消异常 : 还原库存失败,please check:{}",e.getMessage(),e.getCause());throw new RuntimeException();//抛异常出去,rocketmq会重新投递}}
}

取消订单逻辑

    @Overridepublic void cancelOrder(Long orderId,Long memberId) {//查询为付款的取消订单OmsOrderExample example = new OmsOrderExample();example.createCriteria().andIdEqualTo(orderId).andStatusEqualTo(0).andDeleteStatusEqualTo(0);List<OmsOrder> cancelOrderList = orderMapper.selectByExample(example);if (CollectionUtils.isEmpty(cancelOrderList)) {return;}OmsOrder cancelOrder = cancelOrderList.get(0);if (cancelOrder != null) {//修改订单状态为取消cancelOrder.setStatus(4);orderMapper.updateByPrimaryKeySelective(cancelOrder);OmsOrderItemExample orderItemExample = new OmsOrderItemExample();orderItemExample.createCriteria().andOrderIdEqualTo(orderId);List<OmsOrderItem> orderItemList = orderItemMapper.selectByExample(orderItemExample);//解除订单商品库存锁定if (!CollectionUtils.isEmpty(orderItemList)) {portalOrderDao.releaseSkuStockLock(orderItemList);}//修改优惠券使用状态updateCouponStatus(cancelOrder.getCouponId(), cancelOrder.getMemberId(), 0);//返还使用积分if (cancelOrder.getUseIntegration() != null) {//todo 这里需要做分布式事务UmsMember umsMember = umsMemberFeignApi.getMemberById().getData();umsMember.setIntegration(umsMember.getIntegration()+cancelOrder.getUseIntegration());CommonResult<String> result= umsMemberFeignApi.updateUmsMember(umsMember);if(result.getCode() == ResultCode.FAILED.getCode()) {log.warn("远程调用会员服务扣除用户积分异常");throw new RuntimeException("远程调用会员服务扣除用户积分异常");}}}}

4.1.2 Canal同步topic

Topic名字:productDetailChange

生产端:canal

消费端:


import com.alibaba.otter.canal.protocol.FlatMessage;
import com.tuling.tulingmall.common.constant.RedisKeyPrefixConst;
import com.tuling.tulingmall.domain.PmsProductParam;
import com.tuling.tulingmall.util.ClassUtil;
import com.tuling.tulingmall.util.RedisOpsUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;/*** @description: 产品表信息修改-同步更新到Redis-Cache,ELK,Hadoop,相关下游服务**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.canal.topic}",consumerGroup = "${rocketmq.canal.group}")
public class RefreshCacheListener implements RocketMQListener<FlatMessage> {@Autowiredprivate RedisOpsUtil redisOpsUtil;private final static String PRODUCT = "pms_product";private final static String SKU = "pms_sku_stock";/*** 异步下单消费消息* @param flatMessage*/@Overridepublic void onMessage(FlatMessage flatMessage) {log.info("database:{},event-type:{},old-row-data:{},new-row-data:{}",flatMessage.getDatabase(),flatMessage.getType(),flatMessage.getOld(),flatMessage.getData());//修改后的新记录List<Map<String, String>> records = flatMessage.getData();//修改前的数据List<Map<String, String>> old = flatMessage.getOld();switch (flatMessage.getType().toUpperCase()){case "UPDATE":updateCache(records,old,flatMessage.getTable());break;case "DELETE":records.stream().forEach((item)->{//删除缓存redisOpsUtil.delete(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + item.get("id"));});break;}}public void updateCache(List<Map<String, String>> records,List<Map<String, String>> old,String table){int index = 0;/** 被更改的Row所有更改的行*/for(Map<String,String> row : old){Map<String, String> currentRow = records.get(index);String redisKey = RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + getProductId(currentRow,table);PmsProductParam product = redisOpsUtil.get(redisKey,PmsProductParam.class);if(!ObjectUtils.isEmpty(product)){Iterator<Map.Entry<String, String>> iterator = row.entrySet().iterator();while(iterator.hasNext()){Map.Entry entry = iterator.next();String key = (String) entry.getKey();//刷新产品数据product = refresh(product,table,key,currentRow);}/** 更新缓存内容,并设置过期时间*/long expired = redisOpsUtil.getExpired(redisKey, TimeUnit.SECONDS);redisOpsUtil.set(redisKey,product,expired,TimeUnit.SECONDS);}++index;}}/*** 更新缓存数据* @param product* @param table* @param key* @param currentRow* @return*/private PmsProductParam refresh(PmsProductParam product,String table,String key,Map<String, String> currentRow){if(PRODUCT.equals(table)){ClassUtil.callSetterMethod(product,ClassUtil.getSetterMethodName(key),currentRow.get(key));}else if(SKU.equals(table)){product.getSkuStockList().stream().forEach((item)->{if(item.getId() == Long.parseLong(currentRow.get("id"))){ClassUtil.callSetterMethod(item,ClassUtil.getSetterMethodName(key),currentRow.get(key));}});}return product;}/** 获取产品ID*/private String getProductId(Map<String, String> row,String table){if(PRODUCT.equals(table)){return row.get("id");}else{return row.get("product_id");}}}

4.1.3 async-order异步下单topic:

实际生产订单(发送端):SecKillOrderServiceImpl#asyncCreateOrder

@Transactional//这里可以使用分布式事物来优化public Long asyncCreateOrder(OmsOrder order, OmsOrderItem orderItem, Long flashPromotionRelationId) {//减库存Integer result = miaoShaStockDao.descStock(flashPromotionRelationId, 1);if (result <= 0) {throw new RuntimeException("没抢到!");}//插入订单记录orderMapper.insertSelective(order);//OrderItem关联orderItem.setOrderId(order.getId());orderItem.setOrderSn(order.getOrderSn());//插入orderItemorderItemMapper.insertSelective(orderItem);/** 如果订单创建成功,需要发送定时消息,20min后如果没有支付,则取消当前订单,释放库存*/try {boolean sendStatus = orderMessageSender.sendTimeOutOrderMessage(order.getId() + ":" + flashPromotionRelationId + ":" + orderItem.getProductId());if (!sendStatus) {throw new RuntimeException("订单超时取消消息发送失败!");}} catch (Exception e) {throw new RuntimeException("订单超时取消消息发送失败!");}return order.getId();}

处理订单(消费端):com.tuling.tulingmall.component.rocketmq.AscynCreateOrderReciever


import com.tuling.tulingmall.common.constant.RedisKeyPrefixConst;
import com.tuling.tulingmall.component.LocalCache;
import com.tuling.tulingmall.domain.OrderMessage;
import com.tuling.tulingmall.service.SecKillOrderService;
import com.tuling.tulingmall.util.RedisOpsUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @description: 消费监听rocketmq-订单消息**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.tulingmall.asyncOrderTopic}",consumerGroup = "${rocketmq.tulingmall.asyncOrderGroup}")
public class AscynCreateOrderReciever implements RocketMQListener<OrderMessage> {@Autowiredprivate SecKillOrderService secKillOrderService;@Autowiredprivate RedisOpsUtil redisOpsUtil;@Autowiredprivate LocalCache<Object> cache;/*** 异步下单消费消息* @param orderMessage*/@Overridepublic void onMessage(OrderMessage orderMessage) {log.info("listen the rocketmq message");Long memberId = orderMessage.getOrder().getMemberId();Long productId = orderMessage.getOrderItem().getProductId();//订单编号,分库分表不用该编号做订单结果标记String orderSn = orderMessage.getOrder().getOrderSn();Integer limit = orderMessage.getFlashPromotionLimit();Date endDate = orderMessage.getFlashPromotionEndDate();try {Long orderId = secKillOrderService.asyncCreateOrder(orderMessage.getOrder(),orderMessage.getOrderItem(),orderMessage.getFlashPromotionRelationId());//更改排队标记状态,代表已经下单成功,ID设置为snowflake后,用ID作为状态标记redisOpsUtil.set(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId+ ":" + productId,orderId.toString(),60L, TimeUnit.SECONDS);/** 设置用户购买次数,(不限制购买次数了,需要可自行放开此处,* 并在secKillOrderService.checkConfirm中加入验证)*//*Integer rebuy = redisOpsUtil.get(RedisKeyPrefixConst.MEMBER_BUYED_MIAOSHA_PREFIX + memberId + ":" + productId,Integer.class);if(rebuy != null){redisOpsUtil.decr(RedisKeyPrefixConst.MEMBER_BUYED_MIAOSHA_PREFIX + memberId + ":" + productId);}else{//剩余时间Date now = new Date();Long expired = endDate.getTime()-now.getTime();//打上购买次数标记redisOpsUtil.set(RedisKeyPrefixConst.MEMBER_BUYED_MIAOSHA_PREFIX + memberId + ":" + productId,limit-1,expired,TimeUnit.MILLISECONDS);}*/} catch (Exception e) {log.error(e.getMessage(),e.getCause());/** 下单失败*/redisOpsUtil.set(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId+ ":" + productId,Integer.toString(-1),60L, TimeUnit.SECONDS);//还原预减库存secKillOrderService.incrRedisStock(productId);//清除掉本地guava-cache已经售完的标记cache.remove(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);//通知服务群,清除本地售罄标记缓存if(secKillOrderService.shouldPublishCleanMsg(productId)) {redisOpsUtil.publish("cleanNoStockCache", productId);}}}}

目前发送端与消费端在一个项目里,实际部署时不能这么玩,需要拆分;

4.2 异步订单查询接口

如果是异步下单的话,则需要定时任务来查询订单状态;

@ApiOperation("根据购物车信息生成订单")
@GetMapping("/miaosha/result")
@ResponseBody
public CommonResult miaoShaResult(@RequestParam("productId") Long productId,@RequestHeader("memberId") Long memberId){String status = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId+ ":" + productId);if(ObjectUtils.isEmpty(status)){return CommonResult.success(null,"无正在秒杀中的订单!");}if(status.equals("-1")){return CommonResult.success(status,"秒杀失败!");}if(status.equals("1")){return CommonResult.success(status,"正在排队中,请耐心等待!");}//如果Status>1,则秒杀成功,返回订单编号return CommonResult.success(status);
}

总结:

  1. 异步下单可以分流、让服务器处理的压力变小、数据库压力减少(处理库存与处理订单的业务分开)
  2. 解耦的话,业务更加清晰。
  3. 天然的排队处理能力。
  4. 消息中间件有很多特性可以利用,比如订单取消。

五 订单取消

订单超时取消,回滚库存:
com.tuling.tulingmall.component.rocketmq.RocketMqCancelOrderReciever

import com.tuling.tulingmall.service.OmsPortalOrderService;
import com.tuling.tulingmall.service.SecKillOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 消费监听rocketmq-订单超时消息* @author yangguo*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.tulingmall.cancelGroup}", topic = "${rocketmq.tulingmall.scheduleTopic}")
public class RocketMqCancelOrderReciever implements RocketMQListener<String> {@Autowiredprivate OmsPortalOrderService omsPortalOrderService;@Autowiredprivate SecKillOrderService secKillOrderService;/*** 延时消息,取消超时订单* @param cancelId*/@Overridepublic void onMessage(String cancelId) {if(StringUtils.isEmpty(cancelId)){return;}Long orderId = Long.parseLong(cancelId.split(":")[0]);Long promotionId = Long.parseLong(cancelId.split(":")[1]);Long productId = Long.parseLong(cancelId.split(":")[2]);try {//取消的订单,释放DB库存omsPortalOrderService.cancelOrder(orderId,promotionId);//取消的订单-还原缓存库存secKillOrderService.incrRedisStock(productId);} catch (Exception e) {log.error("订单取消异常 : 还原库存失败,please check:{}",e.getMessage(),e.getCause());throw new RuntimeException();//抛异常出去,rocketmq会重新投递}}
}

定时任务处理取消订单存在的问题:
1、11点启动定时任务,毎半个小时扫描数据库一次,我们发现在11:01分下的单并不能30分钟之后失效,而是要到12点也就是定时任务第三次扫扫描数据库才能让订单失效。
2、定时扫数据库的话消耗性能也很大,自然效率也会很低。对数据库压力太大。
3、定时任务的话,集群还需要保证处理的幂等性和分布式问题。这也给系统带来了很多的负担。

在这里插入图片描述
下边这种方式,并发的时候,不安全
在这里插入图片描述
加锁:每个定时任务先去拿锁,性能不好
在这里插入图片描述
异步取消订单:
com.tuling.tulingmall.component.rocketmq.RocketMqCancelOrderReciever


import com.tuling.tulingmall.service.OmsPortalOrderService;
import com.tuling.tulingmall.service.SecKillOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 消费监听rocketmq-订单超时消息* @author yangguo*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.tulingmall.cancelGroup}", topic = "${rocketmq.tulingmall.scheduleTopic}")
public class RocketMqCancelOrderReciever implements RocketMQListener<String> {@Autowiredprivate OmsPortalOrderService omsPortalOrderService;@Autowiredprivate SecKillOrderService secKillOrderService;/*** 延时消息,取消超时订单* @param cancelId*/@Overridepublic void onMessage(String cancelId) {if(StringUtils.isEmpty(cancelId)){return;}Long orderId = Long.parseLong(cancelId.split(":")[0]);Long promotionId = Long.parseLong(cancelId.split(":")[1]);Long productId = Long.parseLong(cancelId.split(":")[2]);try {//取消的订单,释放DB库存omsPortalOrderService.cancelOrder(orderId,promotionId);//取消的订单-还原缓存库存secKillOrderService.incrRedisStock(productId);} catch (Exception e) {log.error("订单取消异常 : 还原库存失败,please check:{}",e.getMessage(),e.getCause());throw new RuntimeException();//抛异常出去,rocketmq会重新投递}}}

创建订单、发送延迟20分钟消息:

com.tuling.tulingmall.service.impl.SecKillOrderServiceImpl#asyncCreateOrder>com.tuling.tulingmall.component.rocketmq.OrderMessageSender#sendTimeOutOrderMessage
/** 如果订单创建成功,需要发送定时消息,20min后如果没有支付,则取消当前订单,释放库存*/
try {boolean sendStatus = orderMessageSender.sendTimeOutOrderMessage(order.getId() + ":" + flashPromotionRelationId + ":" + orderItem.getProductId());if (!sendStatus) {throw new RuntimeException("订单超时取消消息发送失败!");}
} catch (Exception e) {throw new RuntimeException("订单超时取消消息发送失败!");
}

预减库存preDecrRedisStock方法:
通过redis的decr函数扣减库存。
如果没有库存了,stock小于0时 发消息给rocketmq同步库存 redis设置为0
redis与db同步订单:
com.tuling.tulingmall.component.rocketmq.OrderMessageSender#sendStockSyncMessage

/*** 发送延时同步库存消息,60s后同步库存* @param productId* @param promotionId* @return*/public boolean sendStockSyncMessage(Long productId,Long promotionId){Message message = MessageBuilder.withPayload(productId+":"+promotionId).build();SendResult result = rocketMQTemplate.syncSend("stock-sync",message,5000,5);return SendStatus.SEND_OK == result.getSendStatus();}

六 RocketMQ消息

消息零丢失
生产端:同步发送消息、重试机制、事务消息、状态
服务端:刷盘存储(持久化)、主从同步、 状态返回(持久化、主从同步等成功后才返回状态)
消费端:pull broker offset(队列里有offset,偏移量) 消费端完全消费完消息后并且返回成功的情况,才会改变offset偏移值,如果消费失败则下次消费的还是之前那条数据;

RocketMQ消息不被重复消费
由于有重试机制,所以会导致消费重复的问题、也就是幂等性问题。
使用redis incr 自增机制来解决:
假如订单id,orderid为20250101 ,那就把20250101 作为key,处理完后自增为1,即此时value为1;如果下次这个订单又进来了(重复消费),去查value为1,说明已经处理过了,本次就不处理了;

或者,数据库唯一主键也能解决;
数据同步Canal
例如MQ与mysql如何做到数据同步(一般以mysql数据为主);
场景模拟:在秒杀后台把价格修改之后,如何同步到缓存中,比如redis如何同步mysql数据;
Canal是阿里的一款开源产品,canal安装与使用

Canal不适合集成秒杀库存,因为Canal不适合做更新频繁的业务;

项目中对product和秒杀表修改做同步操作:
com.tuling.tulingmall.mq.RefreshCacheListener#onMessage


import com.alibaba.otter.canal.protocol.FlatMessage;
import com.tuling.tulingmall.common.constant.RedisKeyPrefixConst;
import com.tuling.tulingmall.domain.PmsProductParam;
import com.tuling.tulingmall.util.ClassUtil;
import com.tuling.tulingmall.util.RedisOpsUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;/*** @description: 产品表信息修改-同步更新到Redis-Cache,ELK,Hadoop,相关下游服务**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.canal.topic}",consumerGroup = "${rocketmq.canal.group}")
public class RefreshCacheListener implements RocketMQListener<FlatMessage> {@Autowiredprivate RedisOpsUtil redisOpsUtil;private final static String PRODUCT = "pms_product";private final static String SKU = "pms_sku_stock";/*** 异步下单消费消息(由于集成了Canal,所以当数据库数据发生变化时,这里就会收到消息)* @param flatMessage*/@Overridepublic void onMessage(FlatMessage flatMessage) {log.info("database:{},event-type:{},old-row-data:{},new-row-data:{}",flatMessage.getDatabase(),flatMessage.getType(),flatMessage.getOld(),flatMessage.getData());//修改后的新记录List<Map<String, String>> records = flatMessage.getData();//修改前的数据List<Map<String, String>> old = flatMessage.getOld();switch (flatMessage.getType().toUpperCase()){case "UPDATE":updateCache(records,old,flatMessage.getTable());break;case "DELETE":records.stream().forEach((item)->{//删除缓存redisOpsUtil.delete(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + item.get("id"));});break;}}public void updateCache(List<Map<String, String>> records,List<Map<String, String>> old,String table){int index = 0;/** 被更改的Row所有更改的行*/for(Map<String,String> row : old){Map<String, String> currentRow = records.get(index);String redisKey = RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + getProductId(currentRow,table);PmsProductParam product = redisOpsUtil.get(redisKey,PmsProductParam.class);if(!ObjectUtils.isEmpty(product)){Iterator<Map.Entry<String, String>> iterator = row.entrySet().iterator();while(iterator.hasNext()){Map.Entry entry = iterator.next();String key = (String) entry.getKey();//刷新产品数据product = refresh(product,table,key,currentRow);}/** 更新缓存内容,并设置过期时间*/long expired = redisOpsUtil.getExpired(redisKey, TimeUnit.SECONDS);redisOpsUtil.set(redisKey,product,expired,TimeUnit.SECONDS);}++index;}}/*** 更新缓存数据* @param product* @param table* @param key* @param currentRow* @return*/private PmsProductParam refresh(PmsProductParam product,String table,String key,Map<String, String> currentRow){if(PRODUCT.equals(table)){ClassUtil.callSetterMethod(product,ClassUtil.getSetterMethodName(key),currentRow.get(key));}else if(SKU.equals(table)){product.getSkuStockList().stream().forEach((item)->{if(item.getId() == Long.parseLong(currentRow.get("id"))){ClassUtil.callSetterMethod(item,ClassUtil.getSetterMethodName(key),currentRow.get(key));}});}return product;}/** 获取产品ID*/private String getProductId(Map<String, String> row,String table){if(PRODUCT.equals(table)){return row.get("id");}else{return row.get("product_id");}}}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2868979.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

【C语言步行梯】各类操作符、类型转换与原码、反码、补码详谈

&#x1f3af;每日努力一点点&#xff0c;技术进步看得见 &#x1f3e0;专栏介绍&#xff1a;【C语言步行梯】专栏用于介绍C语言相关内容&#xff0c;每篇文章将通过图片代码片段网络相关题目的方式编写&#xff0c;欢迎订阅~~ 文章目录 算术运算符原码、反码、补码介绍移位运算…

sparksession对象简介

什么是sparksession对象 spark2.0之后&#xff0c;sparksession对象是spark编码的统一入口对象&#xff0c;通常我们在rdd编程时&#xff0c;需要SparkContext对象作为RDD编程入口&#xff0c;但sparksession对象既可以作为RDD编程对象入口&#xff0c;在sparkcore编程中可以通…

react可视化编辑器 第二章 自由拖动

完整代码 这里介绍 currentDiv 和 useRef的俩中用法&#xff0c;看自己需求使用 import React, {useState,DragEvent,useRef,useEffect,MouseEvent, } from react;interface Demo {id: number;x: number;y: number; }const App: React.FC () > {const [demos, setDemos] u…

RabbitMQ学习总结-基础篇

1..RabbitMQ 本身是一个消息中间件&#xff0c;在服务应用中&#xff0c;可解决高性能&#xff0c;高并发&#xff0c;高应用的问题&#xff0c;极大程度上解决了应用的性能问题。 2.MQ的使用分为生产者和消费者&#xff0c;生产者生产消息&#xff0c;消费者去消费消息。 3.…

管理类联考–复试–政治--二十大--记忆宫殿

文章目录 整体记忆宫殿门床头柜床书桌阳台 口诀记忆法 整体 记忆宫殿 要有逻辑的放到房间了 何为逻辑&#xff0c;如下大佬总结的便是&#xff0c;或者可自行总结&#xff0c;有前后顺序&#xff0c;做事逻辑即可 第一步&#xff1a;将逻辑的点放到房间里的点&#xff0c;…

从零开始搭建游戏服务器 第二节 Actor模型与应用

目录 复习本节内容正文什么是Actor模型如何应用创建Actor基类创建RootActor创建AkkaContext创建ConnectActorManager和ConnectActor生成actor并发送消息给它 课后作业结尾 复习 上一节我们使用gradle构建了一个多模块系统。 并且在登录服启动了Netty服务&#xff0c;监听confi…

渗透测试框架权限维持技术——Persistence模块

测试环境&#xff1a; kali win7 测试步骤&#xff1a; 1.利用MSF编写远控程序 msfvenom -p windows/meterpreter/reverse_tcp lhost10.0.0.163 lport55555 -f exe -o 5555.exe-p 漏洞利用payload lhost 监听地址&#xff08;kali地址&#xff09; lport 监听端口&#xf…

劲仔食品三年倍增,抢先打响鹌鹑蛋“健康”属性品牌之争?

如果说&#xff0c;进入2024年后&#xff0c;在股价继续陷入回调状态的食品板块中有个股走势表现相对亮眼&#xff0c;那么劲仔食品必是其中之一。 从去年发布2023年三季度业绩公告以来&#xff0c;其强劲的业绩表现就带动了股价走出小趋势。2023年10月23日至今2024年3月13日收…

Spring框架-上篇

预备知识&#xff1a;Maven基础 目录 Spring课程介绍为什么学学什么怎么学将学习的Spring技术 Spring Framework系统架构Spring Framework系统架构图Spring Framework学习线路 核心概念小结IoC案例Io入门案例思路分析Ioc入门案例(XML版) DI入门案例DI入门案例思路分析DI入门案…

关于UE的相机震动CameraShake

创建CameraShake资源 CameraShake配置是个蓝图类&#xff0c;我们选择创建BlueprintClass&#xff0c;父类选择CameraShakeBase即可。 参数调整 目前主要用到了 LocationAmplitudeMultiplier 1 LocationFrequencyMultiplier 10 RotationAmplitudeMultiplier 1 Rotation…

云服务器2核4G能支持多少人同时访问?拿本记上!

腾讯云轻量2核4G5M带宽服务器支持多少人在线访问&#xff1f;5M带宽下载速度峰值可达640KB/秒&#xff0c;阿腾云以搭建网站为例&#xff0c;假设优化后平均大小为60KB&#xff0c;则5M带宽可支撑10个用户同时在1秒内打开网站&#xff0c;并发数为10&#xff0c;经阿腾云测试&a…

使用Python IDLE进行Debug调试

1.首先以我的Python版本为例为大家讲解&#xff0c;我的版本是Python 3.7&#xff0c;版本问题对使用情况影响不大。 2.接着我们可以通过新建文件夹来输入我们的代码或者打开我们已有的代码 这里我直接打开已有的代码效果如图&#xff0c;接下来我们如何使用Debug呢&#xff1…

【LLM】LLama2模型(RMSNorm、SwiGLU、RoPE位置编码)

note 预训练语言模型除了自回归&#xff08;Autoregressive&#xff09;模型GPT&#xff0c;还有自编码模型&#xff08;Autoencoding&#xff09;BERT[1]、编-解码&#xff08;Encoder-Decoder&#xff09;模型BART[67]&#xff0c;以及融合上述三种方法的自回归填空&#xf…

【视频图像取证篇】模糊图像增强技术之深度转化类滤波场景应用小结

【视频图像取证篇】模糊图像增强技术之深度转化类滤波场景应用小结 模糊图像增强技术之深度转化类滤波场景应用小结—【蘇小沐】 &#xff08;一&#xff09;转化类滤波器&#xff08;Convert to filter&#xff09; 1、灰度滤波器&#xff08;Gray filter&#xff09; 灰度…

stm32学习——串口通信中的奇偶校验位

常用的校验算法有奇偶校验、校验和、CRC&#xff0c;还有LRC、BCC等不常用的校验算法。 以串口通讯中的奇校验为例&#xff0c;如果数据中1的个数为奇数&#xff0c;则奇校验位0&#xff0c;否则为1。 例如原始数据为&#xff1a;0001 0011&#xff0c;数据中1的个数&#xf…

STM32-Flash闪存

简介 STM32F1系列的FLASH包含程序存储器、系统存储器和选项字节三个部分&#xff0c;通过闪存存储器接口&#xff08;外设&#xff09;可以对程序存储器和选项字节进行擦除和编程。 读写Flash的用途 1.利用程序存储器的剩余空间来保存掉电不丢失的用户数据。 2.通过在程序中…

springboot“涛宝”大学生二手物品交易商城

摘 要 二十一世纪我们的社会进入了信息时代&#xff0c;信息管理系统的建立&#xff0c;大大提高了人们信息化水平。传统的管理方式对时间、地点的限制太多&#xff0c;而在线管理系统刚好能满足这些需求&#xff0c;在线管理系统突破了传统管理方式的局限性。于是本文针对这一…

SwiftUI的 特性 - ViewModify

SwiftUI的 特性 - ViewModify 记录一下SwiftUI的 特性 - ViewModify的使用方式 可以通过viewModify来管理视图的样式&#xff0c;结合extension来完成封装达到解偶效果 import SwiftUI/// 我们可以通过viewModify来管理视图的样式&#xff0c;来达到解偶效果 struct DefaultB…

5_springboot_shiro_jwt_多端认证鉴权_禁用Cookie

1. Cookie是什么 ​ Cookie是一种在客户端&#xff08;通常是用户的Web浏览器&#xff09;和服务器之间进行状态管理的技术。当用户访问Web服务器时&#xff0c;服务器可以向用户的浏览器发送一个名为Cookie的小数据块。浏览器会将这个Cookie存储在客户端&#xff0c;为这个Co…

都2024年了,你还在用两个手指在电脑键盘上打字吗?

前言 前段时间突然想起来一件很有意思的事情&#xff1a;一个找平面设计岗位的应届生&#xff0c;使用电脑的时候居然还在用两个手指打字。 想起这个事情的时候&#xff0c;并不是想嘲笑谁。 准备步入大学或者准备步入职场的小伙伴们&#xff0c;既然找的工作基本上是要接触电…