Redis 实现的延时队列组件

最近看开源看到一个好用的延时队列组件,已经上生产。代码量很少,主要就是利用Redis监听过期键实现的。然后搞点策略模式柔和柔和。利用Spring Start 封装了一下,全是俺掌握的知识,稍微研究了下就搞懂了。觉得挺有用的,这里分享一下。

Redis 过期键监听

之前写责任链手撸二级缓存的时候,也是借助过期键监听器来更新二级缓存的,详情挪步
CaffeineCache+Redis 接入系统做二层缓存,SPI 思路实现(借鉴 mybatis 二级缓存、自动装配源码)

效果

效果前提: Redis 开启了过期键通知:config set notify-keyspace-events Ex

根据 code 值发布延时任务(10s)。
在这里插入图片描述
对应的code 的处理器,10s后收到通知进行处理任务

在这里插入图片描述
在这里插入图片描述

基于这套组件可实现的功能:订单超时自动取消、会议前 30 分钟自动提醒、订单到点自动收货等,比MQ灵活性更高,RocketMq 老版只支持最高30 分钟的延时任务,这套组件可以指定任意时间。且可无限扩展 topic,满足不同类型的业务。缺点就是严重依赖Redis,需要保证Redis的高可用

RedisExpiredListener配置

利用ApplicationContextAware注册所有的messageHandleRouter处理器,当有消息过来时解析消息格式中的CODE,根据CODE把任务分发给具体的某个messageHandleRouter实现类进行处理。进行业务隔离。

package com.zzh.mybatisplus5.mq;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;import java.util.HashMap;public class RedisExpiredListener implements MessageListener, ApplicationContextAware {/*** 客户端监听订阅的topic,当有消息的时候,会触发该方法;* 并不能得到value, 只能得到key。* 姑且理解为: redis服务在key失效时(或失效后)通知到java服务某个key失效了, 那么在java中不可能得到这个redis-key对应的redis-value。*/protected HashMap<Integer, DelayedMessageHandler> handlerRouter;private static final Logger logger = LoggerFactory.getLogger(RedisExpiredListener.class);@Overridepublic void onMessage(Message message, byte[] bytes) {String expiredKey = message.toString();// TASK:CODE:VALUE结构String[] split = expiredKey.split(":");if (split.length < 2 || !expiredKey.startsWith("TASK:")) {return;}logger.info("[Redis键失效通知] key=" + expiredKey);StringBuilder value = new StringBuilder();for (int i = 2; i < split.length; i++) {value.append(split[i]);if (i != split.length - 1) {value.append(":");}}int code = Integer.parseInt(split[1]);DelayedMessageHandler handler = handlerRouter.get(code);if (handler != null) {handler.handle(value.toString());}}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.handlerRouter = (HashMap<Integer, DelayedMessageHandler>) applicationContext.getBean("messageHandleRouter");}
}

DelayedMessageQueue实现类

基础配置类,里面配置了监听哪个 Redis 库的过期事件

package com.zzh.mybatisplus5.mq;import com.zzh.mybatisplus5.component.CacheComponent;
import org.springframework.beans.factory.annotation.Autowired;import java.util.concurrent.Callable;public class RedisNotifyDelayedMessageQueueImpl implements DelayedMessageQueue {@Autowiredprivate CacheComponent cacheComponent;@Overridepublic Boolean publishTask(Integer code, String value, Integer delay) {if (delay < 0) {delay = 1;}cacheComponent.putRaw(assembleKey(code, value), "", delay);return true;}@Overridepublic Boolean deleteTask(Integer code, String value) {cacheComponent.del(assembleKey(code, value));return true;}@Overridepublic Long getTaskTime(Integer code, String value) {return cacheComponent.getKeyExpire(assembleKey(code, value));}@Overridepublic Boolean publishTask(Callable task, Integer delay) {throw new RuntimeException();}public String assembleKey(Integer code, String value) {if (value == null) {value = "";}StringBuilder sb = new StringBuilder("TASK:");sb.append(code + ":");sb.append(value);return sb.toString();}
}

Redis 配置

package com.zzh.mybatisplus5.mq;import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;@Configuration
public class RedisAutoConfig {private static final Logger logger = LoggerFactory.getLogger(RedisAutoConfig.class);@Value("${spring.redis.database}")private Integer cacheDB;@Beanpublic Map<Integer, DelayedMessageHandler> messageHandleRouter(List<DelayedMessageHandler> delayedMessageHandlerList) {return delayedMessageHandlerList.stream().collect(Collectors.toMap(DelayedMessageHandler::getCode, v -> v));}@Beanpublic RedisExpiredListener redisExpiredListener() {return new RedisExpiredListener();}/*** 指定 redis 库运行 config set notify-keyspace-events Ex 即可,不然监听无法生效* redis服务端需要配置 notify-keyspace-events 参数 ,至少包含k或者e* K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀* E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀* g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知* $ 字符串命令的通知* l 列表命令的通知* s 集合命令的通知* h 哈希命令的通知* z 有序集合命令的通知* x 过期事件:每当有过期键被删除时发送* e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送* A 参数 g$lshzxe 的别名** @后边可以指定db库,*代表所有库,0代表0库 __keyevent@0__:expired 0库过期的数据* __keyspace@0__:mykey   0库mykey这个键的所有操作* __keyevent@0__:del     0库所有del这个命令*/@Beanpublic RedisMessageListenerContainer container(LettuceConnectionFactory defaultLettuceConnectionFactory, RedisExpiredListener expiredListener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(defaultLettuceConnectionFactory);//监听指定库的过期keycontainer.addMessageListener(expiredListener, new PatternTopic("__keyevent@" + cacheDB + "__:expired"));return container;}@Beanpublic DelayedMessageQueue delayedMessageQueue() {return new RedisNotifyDelayedMessageQueueImpl();}@Beanpublic LettuceConnectionFactory defaultLettuceConnectionFactory(RedisConfiguration defaultRedisConfig,GenericObjectPoolConfig defaultPoolConfig) {LettuceClientConfiguration clientConfig =LettucePoolingClientConfiguration.builder().commandTimeout(Duration.ofMillis(5000)).poolConfig(defaultPoolConfig).build();return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);}@Beanpublic RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory defaultLettuceConnectionFactory) {RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(defaultLettuceConnectionFactory);return redisTemplate;}@Beanpublic StringRedisTemplate stringRedisTemplate(LettuceConnectionFactory defaultLettuceConnectionFactory) {StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();stringRedisTemplate.setConnectionFactory(defaultLettuceConnectionFactory);return stringRedisTemplate;}@Configurationpublic static class DefaultRedisConfig {@Value("${spring.redis.master-name}")private String masterName;@Value("${spring.redis.mode}")private String mode;@Value("${spring.redis.host:127.0.0.1:6379}")private String host;@Value("${spring.redis.password:}")private String password;@Value("${spring.redis.database:0}")private Integer database;@Value("${spring.redis.lettuce.pool.max-active:8}")private Integer maxActive;@Value("${spring.redis.lettuce.pool.max-idle:8}")private Integer maxIdle;@Value("${spring.redis.lettuce.pool.max-wait:-1}")private Long maxWait;@Value("${spring.redis.lettuce.pool.min-idle:0}")private Integer minIdle;@Beanpublic GenericObjectPoolConfig defaultPoolConfig() {GenericObjectPoolConfig config = new GenericObjectPoolConfig();config.setMaxTotal(maxActive);config.setMaxIdle(maxIdle);config.setMinIdle(minIdle);config.setMaxWaitMillis(maxWait);return config;}@Beanpublic RedisConfiguration defaultRedisConfig() {return getRedisConfiguration(masterName, mode, host, password, database);}}private static RedisConfiguration getRedisConfiguration(String masterName, String mode, String host, String password, Integer database) {if (mode.equals("single")) {RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();String[] hostArray = host.split(":");config.setHostName(hostArray[0]);config.setPassword(RedisPassword.of(password));config.setPort(Integer.parseInt(hostArray[1]));config.setDatabase(database);return config;} else if (mode.equals("sentinel")) {RedisSentinelConfiguration configuration = new RedisSentinelConfiguration();configuration.setMaster(masterName);String[] hostList = host.split(",");List<RedisNode> serverList = new LinkedList<>();for (String hostItem : hostList) {String[] hostArray = hostItem.split(":");RedisServer redisServer = new RedisServer(hostArray[0], Integer.parseInt(hostArray[1]));serverList.add(redisServer);}configuration.setSentinels(serverList);logger.info("[Redis] 哨兵节点: masterName={}, host={}", masterName, host);return configuration;} else {return null;}}}

顶级策略接口

没啥好说的,老三样

package com.zzh.mybatisplus5.mq;import java.util.concurrent.Callable;public interface DelayedMessageQueue {/*** 添加延迟秒数,RingDelayedMessageQueueImpl专用,单机实现* * @param delay seconds* @return*/public Boolean publishTask(Callable task, Integer delay);/*** RedisNotifyDelayedMessageQueueImpl专用,集群实现* 这两个都会被拼接为 TASK:(随机码):CODE:VALUE 当成key存入redis中,因为回调时只会返回key,而不会返回key对应的值* @param code 回调时用来选择的Handler的CODE* @param value 回调时使用的值* @param delay 多少秒后调用* @return*/public Boolean publishTask(Integer code, String value, Integer delay);/*** 删除已有任务* @param code* @param value* @return*/public Boolean deleteTask(Integer code, String value);/*** 获取指定任务还有多少时间执行,如果不存在,返回-2* @param code* @param value* @return*/public Long getTaskTime(Integer code, String value);
}
package com.zzh.mybatisplus5.mq;/*** 延迟消息处理器*/
public interface DelayedMessageHandler {/**** @param value* @return 处理成功的返回大于0结果,失败返回0*/public int handle(String value);public int getCode();
}

延时队列设计思路

和我之前使用策略模式封装的多个OSS使用,写法简直是一毛一样,详情挪步。策略模式调优(多Oss存储导致代码冗余的问题)在这里插入图片描述

延时队列消息丢失怎么解决

开个定时任务,每隔一分钟定时进行扫表,加了索引、延时消息丢失不多的情况下,查数据会很快。扫到有超时的订单,接着丢到 Redis 延时队列里面,双重保险。同时定时任务加个分布式锁,一台机器运行即可。​代码爆红是因为,我拉的开源项目,没跑起来直接看的源码。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3223656.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

2024.7.9作业

1、提示并输入一个字符串&#xff0c;统计该字符串中字母、数字、空格以及其他字符的个数 #include <stdio.h> #include <string.h> int main(int argc,const char *argv[]) { char arr[30]{0}; int zm0,kg0,sz0,qt0; printf("请输入字符串&…

utf8mb4和utf8的不同、若依框架,代码生成器,gitee,前端vue的下载、修复和启动(寻求大佬帮助若依框架三、2.3)

2024.7.9 一、数据库的排序和统一问题。utf8mb4和utf8的不同1.1 发现问题1.2 解决问题-在idea中用sql生成器&#xff0c;生成sql语句&#xff0c;然后在里面修改1.3 utf8和utf8mb4的区别 二、若依前后端框架。代码生成器&#xff08;还没研究懂&#xff0c;但有三个方案&#x…

微软清华提出全新预训练范式,指令预训练让8B模型实力暴涨!实力碾压70B模型

现在的大模型训练通常会包括两个阶段&#xff1a; 一是无监督的预训练&#xff0c;即通过因果语言建模预测下一个token生成的概率。该方法无需标注数据&#xff0c;这意味着可以利用大规模的数据学习到语言的通用特征和模式。 二是指令微调&#xff0c;即通过自然语言指令构建…

Python基础-成年人判断(if条件语句联系)

注意输入的年龄需要转化为字符串 代码&#xff1a; print("欢迎来到游乐场&#xff1a;儿童免费&#xff0c;成人收费") age int(input("请输入你的年龄:")) if age>18:print("你已经成年&#xff0c;需要补票10元") # 四个空格缩进print…

使用Mplayer实现MP3功能

核心功能 1. 界面设计 项目首先定义了一个clearscreen函数&#xff0c;用于清空屏幕&#xff0c;为用户界面的更新提供了便利。yemian函数负责显示主菜单界面&#xff0c;提供了包括查看播放列表、播放控制、播放模式选择等在内的9个选项。 2. 文件格式支持 is_supported_f…

gpt-4o看图说话-根据图片回答问题

问题&#xff1a;中国的人口老龄化究竟有多严重&#xff1f; 代码下实现如下&#xff1a;&#xff08;直接调用openai的chat接口&#xff09; import os import base64 import requests def encode_image(image_path): """ 对图片文件进行 Base64 编码 输入…

Nacos2.X 配置中心源码分析:客户端如何拉取配置、服务端配置发布客户端监听机制

文章目录 Nacos配置中心源码总流程图NacosClient源码分析获取配置注册监听器 NacosServer源码分析配置dump配置发布 Nacos配置中心源码 总流程图 Nacos2.1.0源码分析在线流程图 源码的版本为2.1.0 &#xff0c;并在配置了下面两个启动参数&#xff0c;一个表示单机启动&#…

pytest-yaml-sanmu(六):YAML数据驱动测试

如果说 pytest 中哪些标记使用得最多&#xff0c;那无疑是 parametrize 了&#xff0c; 它为用例实现了参数化测试的能力&#xff0c;进而实现了数据驱动测试的能力。 1. 使用标记 parametrize 的使用需要提高两个内容&#xff1a; 参数名 参数值 pytest 在执行用例时&…

【LeetCode刷题笔记】LeetCode.11.盛最多水的容器

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 更多算法知识专栏&#xff1a;算法分析&#x1f525; 给大家跳段街舞感谢…

020-GeoGebra中级篇-几何对象之点与向量

本文概述了在GeoGebra中如何使用笛卡尔或极坐标系输入点和向量。用户可以通过指令栏输入数字和角度&#xff0c;使用工具或指令创建点和向量。在笛卡尔坐标系中&#xff0c;示例如“P(1,0)”&#xff1b;在极坐标系中&#xff0c;示例如“P(1;0)”或“v(5;90)”。文章还介绍了点…

Python大数据分析——决策树和随机森林

Python大数据分析——决策树和随机森林 决策树决策树节点字段的选择信息熵条件熵信息增益信息增益率 基尼指数条件基尼指数基尼指数增益 决策树函数 随机森林函数 决策树 图中的决策树呈现自顶向下的生长过程&#xff0c;深色的椭圆表示树的根节点&#xff1b;浅色的椭圆表示树…

Raylib 实现超大地图放大缩小与两种模式瓦片地图刷新

原理&#xff1a; 一种刷新模式&#xff1a; 在宫格内整体刷新&#xff0c;类似九宫格移动到边缘&#xff0c;则九宫格整体平移一个宫格&#xff0c;不过这里是移动一个瓦片像素&#xff0c;实际上就是全屏刷新&#xff0c;这个上限是 笔记本 3060 70帧 100*100个瓦片每帧都…

思维+并查集,1670C - Where is the Pizza?

一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 1670C - Where is the Pizza? 二、解题报告 1、思路分析 考虑两个数组a&#xff0c;b的每个位置只能从a&#xff0c;b中挑一个 不妨记posa[x]为x在a中位置&#xff0c;posb同理 我们假如位置i挑选a[i]&a…

Java--instanceof和类型转换

1.如图&#xff0c;Object&#xff0c;Person&#xff0c;Teacher&#xff0c;Student四类的关系已经写出来了&#xff0c;由于实例化的是Student类&#xff0c;因此&#xff0c;与Student类存在关系的类在使用instanceof时都会输出True&#xff0c;而无关的都会输出False&…

小试牛刀--对称矩阵压缩存储

学习贺利坚老师对称矩阵压缩存储 数据结构实践——压缩存储的对称矩阵的运算_计算压缩存储对称矩阵 a 与向量 b 的乘积-CSDN博客 本人解析博客 矩阵存储和特殊矩阵的压缩存储_n阶对称矩阵压缩-CSDN博客 版本更新日志 V1.0: 对老师代码进行模仿 , 我进行名字优化, 思路代码注释 …

ARM裸机:一步步点亮LED(汇编)

硬件工作原理及原理图查阅 LED物理特性介绍 LED本身有2个接线点&#xff0c;一个是LED的正极&#xff0c;一个是LED的负极。LED这个硬件的功能就是点亮或者不亮&#xff0c;物理上想要点亮一颗LED只需要给他的正负极上加正电压即可&#xff0c;要熄灭一颗LED只需要去掉电压即可…

2024 Q3 NAND闪存价格|企业级依然猛涨,消费级放缓

在企业领域持续投资于服务器基础设施&#xff0c;特别是在人工智能应用的推动下&#xff0c;企业级SSD需求增加的同时&#xff0c;消费电子市场却依旧疲软。加之NAND供应商在2024年下半年积极扩大生产&#xff0c;预计到2024年第三季度&#xff0c;NAND闪存供应充足率将上升至2…

jQuery 笔记

一、什么是jQuery 框架&#xff1a;半成品软件 Jquery就是封装好的js 本质上还是js jQuery是一个快速、简洁的JavaScript**框架**&#xff0c;是继Prototype之后又一个优秀的**JavaScript代码库**&#xff08;*或JavaScript框架*&#xff09;。 JQuery:封装好的代码库。有一…

程序设计——领域驱动设计

程序设计的所有原则和方法论都是追求一件事——简单——功能简单、依赖简单、修改简单、理解简单。因为只有简单才好用&#xff0c;简单才好维护。因此&#xff0c;不应该以评论艺术品的眼光来评价程序设计是否优秀&#xff0c;程序设计的艺术不在于有多复杂多深沉&#xff0c;…

JVM原理(二三):JVM虚拟机线程安全的实现方法

1. 互斥同步 互斥同步(MutualExclusion&Synchronization)是一种最常见也是最主要的并发正确性保障手段。同步是指在多个线程并发访问共享数据时&#xff0c;保证共享数据在同一个时刻只被一条(或者是一些&#xff0c;当使用信号量的时候)线程使用。而互斥是实现同步的一种…