目录
利用redis实现消息队列(基于list,点对点模型)——lpush存放队列(lpush 队列名 队列内容(可一次存放多个内容,用空格隔开)) brpop取队列(brpop 队列名 等待时间单位秒(一次取一个))
基于pubsub实现消息队列(发布订阅模型)
基于stream的消息队列
方法一:
存放消息
读取消息
编辑 案例
特点:
方法二:消费者组
创建
读取
java实现思路
特点
三种方式的对比:
案例,更改之前的案例
基于基于秒杀-----分布式锁----lua脚本_xzm_的博客-CSDN博客改进
利用redis实现消息队列(基于list,点对点模型)——lpush存放队列(lpush 队列名 队列内容(可一次存放多个内容,用空格隔开)) brpop取队列(brpop 队列名 等待时间单位秒(一次取一个))
优缺点:
基于pubsub实现消息队列(发布订阅模型)
优缺点:
基于stream的消息队列
方法一:
存放消息
读取消息
案例
特点:
方法二:消费者组
创建
读取
java实现思路
特点
三种方式的对比:
个人感觉:mq>stream>list>pubsub
案例,更改之前的案例
创建消息队列
修改Lua秒杀脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by Lenovo.
--- DateTime: 2023/5/30 16:55
----- 1.参数列表
-- 1.1 . 优惠卷id
local voucherId= ARGV[1]
-- 1.2. 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]-- 2.数据key
--2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3. 脚本业务
-- 3.1. 判断库存是否充足 get stockKey
if (tonumber(redis.call('get',stockKey)) <= 0 ) then-- 3.2. 库存不足,返回1return 1
end-- 3.2. 判断用户是否下单 SISMEMBER orderKey userId
if (redis.call('SISMEMBER',orderKey,userId) == 1) then-- 3.3. 存在,说明是重复下单,返回2return 2
end
-- 3.4. 扣库存 incrby stockKey -1
redis.call('incrby',stockKey, -1)
-- 3.5. 下单(保存用户) sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 3.6 发送消息到队列中,xadd stream.orders * k1 v1 k2 v2 ...
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
修改java代码实现发送消息
/*** 基于stream的实现秒杀* @param voucherId* @return*/public Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();//生成订单idlong orderId = redisIdWorker.nextId("order");//执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,//脚本文件Collections.emptyList(),//key的集合voucherId.toString(), userId.toString() ,String.valueOf(orderId)//三个ARGV);//将返回值转换为intint i = result.intValue();//判断结果是否为零if (i != 0 ){//不为零,无法购买 ---1为库存不足,2为用户已经下单return Result.fail(i == 1 ? "库存不足" : "不能重复下单");}//可以购买//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//返回订单idreturn Result.ok(orderId);}
实现消息的消费
//创建线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{String queueName="stream.orders";@Overridepublic void run() {while (true){try {//1.获取消息队列中的订单消息,xreadgroup group g1 c1 count 1 block 2000 streams streams.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//2.判断消息是否获取成功if (list ==null || list.isEmpty()){//获取失败,进行下一次循环continue;}//3.解析消息中的订单信息MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.创建订单handleVoucherOrder(voucherOrder);//ack确认 ack stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendingList();}}}private void handlePendingList(){while (true){try {//1.获取pending-list中的订单消息,xreadgroup group g1 c1 count 1 streams streams.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));//2.判断消息是否获取成功if (list ==null || list.isEmpty()){//获取失败,退出循环break;}//3.解析消息中的订单信息MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> value = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.创建订单handleVoucherOrder(voucherOrder);//ack确认 ack stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());} catch (Exception e) {log.error("处理pending-list订单异常",e);try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}}}