延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

一、接着上文

上文我们讲述了使用redisson的RDelayedQueue实现分布式延迟队列,本文我们将自己JDK的延迟队列DelayQueue实现。

相比前者的实现,作为进程内的延迟队列,它会遇到许多技术难点:

  • 如何支持分布式的多个节点部署场景
  • 应用重启会恢复延时队列
  • 冷数据如何转换为热数据
  • 如何删除延迟队列中的任务

随后,我们也将提及:

  • 保存任务至延迟队列(生产者)
  • 读取延迟队列中的任务(消费者)

二、设计概要

在这里插入图片描述

  • 冷数据:mysql表中的任务数据

  • 热数据:jdk 延迟队列中的任务

  • 广播事件:删除延迟队列中的任务,发布的是广播事件,可以使用redis topic实现。

  • 本地事件:分布式多节点部署的时候,每个任务只保存在其中一个节点的延迟队列中,可以使用spring事件驱动实现。

  • 延迟队列 DelayQueueJob, 它实现了接口Delayed

包括任务的交易流水号和过期时间(即任务的回调时间)

import lombok.Builder;
import lombok.Data;import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author xxx*/
@Builder
@Data
public class DelayQueueJob implements Delayed {/*** 交易流水号*/private String transNo;/*** 到期时间*/private Date expireDate;public DelayQueueJob(String transNo, Date expireDate) {super();this.transNo = transNo;this.expireDate = expireDate;}/*** 用于队列中排序过期时间** @param o* @return*/@Overridepublic int compareTo(Delayed o) {return Long.valueOf(this.expireDate.getTime()).compareTo(Long.valueOf(((DelayQueueJob) o).expireDate.getTime()));}/*** 用于获取过期时间* 延迟关闭时间 = 过期时间 - 当前时间** @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return this.expireDate.getTime() - System.currentTimeMillis();}
}

三、应用启动流程

解决恢复延迟队列的问题。因为DelayQueue是进程内的,一旦重启,将被销毁。

在这里插入图片描述

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Service
@RequiredArgsConstructor
public class ApplicationStartupListener implements ApplicationListener<ApplicationReadyEvent> {@Overridepublic void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 实现代码参考上面的流程图}
}

四、定时任务流程

解决冷数据如何转换为热数据的问题,防止延时任务过多导致消耗过多的jvm内存,所以只有回调时间将近的任务才放入延迟队列。

在这里插入图片描述

五、如何删除延迟队列中的任务

删除延迟队列的任务:发送广播消息通知所有的节点,当不是当前节点的时候,执行删除。

if (!NetUtil.getLocalhostStr().equals(ipAddress)) {DelayQueueSingleton.getDelayQueue().remove(transNo);
}

DelayQueueSingletons是一个单例类,详见下:

public class DelayQueueSingleton {private static volatile CustomDelayQueue<DelayQueueJob> delayQueue;private DelayQueueSingleton() {}public static CustomDelayQueue<DelayQueueJob> getDelayQueue() {if (delayQueue == null) {synchronized (DelayQueueSingleton.class) {if (delayQueue == null) {delayQueue = new CustomDelayQueue<>();}}}return delayQueue;}}

这里为了删除延迟队列的任务,我们对DelayQueue进行了重写。


import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;public class CustomDelayQueue<T extends Delayed> {private final DelayQueue<T> queue = new DelayQueue<>();private final Map<String, T> map = new ConcurrentHashMap<>();public boolean put(T task, String taskId) {// 如果任务已存在,则删除旧任务,防止重复添加this.remove(taskId);map.put(taskId, task);return queue.add(task);}public boolean remove(String taskId) {// 先删除map,再删除queueT task = map.remove(taskId);if (task != null) {return queue.remove(task);}return false;}public T take() throws InterruptedException {return queue.take();}
}

六、保存任务至延迟队列(生产者)


// 如果通知时间在一定时间范围内
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {DelayQueueSingleton.getDelayQueue().put(DelayQueueJob.builder().transNo(event.getTransNo()).expireDate(event.getNotifyDate()).build(), event.getTransNo());}

七、读取延迟队列中的任务(消费者)

作为延迟队列的消费者,它的实现和上一篇文章实现类似。不同的是take()获取任务不一样。

String transNo = null;
Date notifyDate = null;DelayQueueJob job = DelayQueueSingleton.getDelayQueue().take();
if (null != job) {transNo = job.getTransNo();notifyDate = job.getExpireDate();
}if (null == transNo) {return;
}if (log.isInfoEnabled()) {log.info("开始执行延迟队列中的任务,transNo={},notifyDate={}", transNo, notifyDate);
}// 异步执行你的操作
notifyTaskService.handleTask(transNo, notifyDate);

八、总结

作为进程内的延迟队列,在多点部署的分布式集群环境下, 代码明显比上一篇要复杂得多。

它们都需要的步骤是:

  • 任务的生产
  • 任务的消费
  • 移除任务

DelayQueue额外多出来的步骤是:

  • 应用启动的时候拉取回调时间将近的未完成任务(更新marked标记为true,防止重复拉取冷数据)
  • 定时拉取未标记且回调时间将近的未完成任务(和上面必须是互斥,等待上一步执行完成,否则会导致重复拉取)
  • 删除延迟队列DelayQueue的任务,必须发布广播消息给全部节点。(引入广播消息机制)

由此可见,任务表的字段marked仅供DelayQueue使用,防止重复拉取数据库的任务到热数据区。

    @Column(name = "marked", nullable = false, columnDefinition = "TINYINT(1) default 0 COMMENT '是否已标记为热数据'")private Boolean marked;

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务

延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3016173.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

matplotlib和pandas与numpy

1.matplotlib介绍 一个2D绘图库&#xff1b; 2.Pandas介绍&#xff1a; Pandas一个分析结构化数据的工具&#xff1b; 3.NumPy 一个处理n纬数组的包&#xff1b; 4.实践&#xff1a;绘图matplotlip figure()生成一个图像实例 %matplotlib inline&#xff1a;图形直接在…

就业班 第三阶段(redis) 2401--5.7 day2 redis2 哨兵(前提是做好了主从)+redis集群

1、设置密码&#xff08;redis&#xff09; 先在redis.conf里面找到这个 后面写上要设置的密码即可 2、哨兵模式 监控redis集群中master状态的的工具 在做了主从的前提下 主 从1 从2 作用 1)&#xff1a;Master状态检测 2)&#xff1a;如果Master异常&#xff0c;则会进行…

Linux--基础IO(文件描述符fd)

目录 1.回顾一下文件 2.理解文件 下面就是系统调用的文件操作 文件描述符fd&#xff0c;fd的本质是什么&#xff1f; 读写文件与内核级缓存区的关系 据上理论我们就可以知道&#xff1a;open在干什么 3.理解Linux一切皆文件 4.C语言中的FILE* 1.回顾一下文件 先来段代码…

数据结构——链表(精简易懂版)

文章目录 链表概述链表的实现链表的节点&#xff08;单个积木&#xff09;链表的构建直接构建尾插法构建头插法构建 链表的插入 总结 链表概述 1&#xff0c;链表&#xff08;Linked List&#xff09;是一种常见的数据结构&#xff0c;用于存储一系列元素。它由一系列节点&…

Mysql查询语句(一)简单查询和简单条件查询

MySQL的所有语句中&#xff0c;我们日常用的最多的其实就是查询语句。因此这篇文章主要介绍查询语句中的一些基础语法。 目录 简单查询 简单条件查询 简单查询 最简单的查询语句的语法如下所示&#xff1a; SELECT * FROM student; 它的语法解析如下&#xff1a; SELECT关…

学习笔记:【QC】Android Q qmi扩展nvReadItem/nvWriteItem

一、qmi初始化 流程图 初始化流程: 1、主入口&#xff1a; vendor/qcom/proprietary/qcril-hal/qcrild/qcrild/rild.c int main(int argc, char **argv) { const RIL_RadioFunctions *(*rilInit)(const struct RIL_Env *, int, char **); rilInit RIL_Init; funcs rilInit…

122. Kafka问题与解决实践

文章目录 前言顺序问题1. 为什么要保证消息的顺序&#xff1f;2.如何保证消息顺序&#xff1f;3.出现意外4.解决过程 消息积压1. 消息体过大2. 路由规则不合理3. 批量操作引起的连锁反应4. 表过大 主键冲突数据库主从延迟重复消费多环境消费问题后记 前言 假如有家公司是做餐饮…

无法添加以供审核,提交以供审核时遇到意外错误。如果问题仍然存在,请联系我们

遇到问题&#xff1a; 无法添加以供审核 要开始审核流程&#xff0c;必须提供以下项目&#xff1a; 提交以供审核时遇到意外错误。如果问题仍然存在&#xff0c;请联系我们。 解决办法&#xff1a; 修改备案号为小写&#xff0c; 例如&#xff1a;京ICP备2023013223号-2A 改…

自然语言(NLP)

It’s time for us to learn how to analyse natural language documents, using Natural Language Processing (NLP). We’ll be focusing on the Hugging Face ecosystem, especially the Transformers library, and the vast collection of pretrained NLP models. Our proj…

运动控制“MC_MoveVelocity“功能块详细应用介绍

1、运动控制单位u/s介绍 运动控制单位[u/s]介绍-CSDN博客文章浏览阅读91次。运动控制很多手册上会写这样的单位,这里的u是英文单词unit的缩写,也就是单位的意思,所以这里的单位不是微米/秒,也不是毫米/秒,这里是一个泛指,当我们的单位选择脉冲时,它就是脉冲/秒,也就是…

【排序算法】第四章:归并排序(近万字讲解,通俗易懂)

归并排序 归并排序本质就是一种思想&#xff0c;在很多题目都可以用到 一、归并排序的原理 归并排序&#xff08;MergeSort&#xff09; 是建立在归并操作上的一种有效的排序算法&#xff0c;采用分治法排序&#xff0c;分为分解、合并两个步骤。 分解&#xff1a;将数组分割…

GESP一级考试笔记(C++)

考纲 GESP C 一级考纲 一、计算机基础知识 二、变量 1.变量的声明 想要使用变量&#xff0c;必须先做“声明”&#xff0c;也就是告诉计算机要用到的数据叫什么名字。变量声明的标准语法可以写成&#xff1a;数据类型 变量名; #include <iostream> using namespace s…

速览Coinbase 2024Q1 财报重点:业务全面开花,净利润达11.8亿美元

作者&#xff1a;范佳宝&#xff0c;Odaily 星球日报 近期&#xff0c;Coinbase 发布了其 2024 年第一季度财报。 报告显示&#xff0c;Coinbase 第一季度营收为 16.4 亿美元&#xff0c;高于分析师平均预期的 13.4 亿美元&#xff1b;净利润为 11.8 亿美元&#xff0c;合每股…

PyCharm怎么安装Comate与使用示范

目录 简单介绍Comate 安装步骤详解 Comate使用示范详解 使用总结 简单介绍Comate Baidu Comate智能编码助手是一款基于文心大模型打造的编码辅助工具&#xff0c;具备多重优势&#xff0c;包括代码智能、应用场景丰富、创造价值高、广泛应用等。它能帮助开发者提升编码效率…

LeetCode算法题:8.字符串转换整数 (atoi)

请你来实现一个 myAtoi(string s) 函数&#xff0c;使其能将字符串转换成一个 32 位有符号整数&#xff08;类似 C/C 中的 atoi 函数&#xff09;。 函数 myAtoi(string s) 的算法如下&#xff1a; 读入字符串并丢弃无用的前导空格检查下一个字符&#xff08;假设还未到字符末…

Java:内存模型

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 一、Java内存模型出现的背景 二、什么是Java内存模型 三、Java内存模型的底层实现 总结 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一…

linux系统 虚拟机的安装详细步骤

window&#xff1a; (1) 个人&#xff1a;win7 win10 win11 winxp (2)服务器&#xff1a;windows server2003 2008 2013 linux&#xff1a; (1)centos7 5 6 8 (2)redhat (3)ubuntu (4)kali 什么是linux: 主要是基于命令来完成各种操作&#xff0c;类似于DO…

【黑客不回去做的三件事情】

作为黑客不会去做的三件事 黑客在认知上和普通人有什么区别&#xff1f;什么行为是自己觉得很正常&#xff0c; 但是大部分黑客并不会这么做的。今天来聊聊作为黑客&#xff0c;有哪些日常生活中不会去做的事&#xff0c; 第一&#xff0c;黑客不会用同一个密码&#xff0c;并…

开源模型应用落地-CodeQwen模型小试-探索更多使用场景(三)

一、前言 代码专家模型是基于人工智能的先进技术&#xff0c;它能够自动分析和理解大量的代码库&#xff0c;并从中学习常见的编码模式和最佳实践。这种模型可以提供准确而高效的代码建议&#xff0c;帮助开发人员在编写代码时避免常见的错误和陷阱。 通过学习代码专家模型&…

vivado Zynq UltraScale+ MPSoC 比特流设置

Zynq UltraScale MPSoC 比特流设置 下表所示 Zynq UltraScale MPSoC 器件的器件配置设置可搭配 set_property <Setting> <Value> [current_design] Vivado 工具 Tcl 命令一起使用。