深入浅出消息队列----【延迟消息的实现原理】

深入浅出消息队列----【延迟消息的实现原理】

  • 粗说 RocketMQ 的设计
  • 细说 RocketMQ 的设计
  • 这样实现是否有什么问题?

本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】

粗说 RocketMQ 的设计

RocketMQ 约定了一些延迟时间,即生产者无法灵活的自定义延迟时间,而是固定的几个延迟时间来供生产者选择。

请添加图片描述
这样延迟消息就有了统一归类和约束,便于管理和调配。

虽说归类了延迟消息,但是同一个延迟 level 的延迟消息共用一个闹钟也是无法满足需求的。

所以变成专门雇一个“人”,每个“人”管一个 level 的延迟消息,定时查看是否有到期的消息,如果到了立马让消息给消费者消费。

至于复用 commitlog 这一套的问题,专门搞个存放延迟消息的 Topic,延迟消息先发往这个 Topic,消费者并不会订阅这个 Topic,因此此时消费者无法消费到这个消息。

等到延迟消息到达时间后,Broker 将这个延迟消息发往原 Topic,此时消费者就能从原 Topic 消费到这条消息!

也就是说 Broker 自己建立一个专门 Topic 用来存放延迟消息,此时延迟消息的存储能复用 commitlog 这一套模型,消息也会被分发到 consumerQueue。

不同的延迟 level 的消息回存放到这个 Topic 不同的队列中,也就是说这个 Topic 一个有 18 个队列对应 18 个 level。

请添加图片描述

然后会有一个定时线程去每个队列按序检查消息是否都到时间了,如果到了就发到消息原先的 Topic 中。

请添加图片描述

细说 RocketMQ 的设计

延迟消息的发送很简单,仅需设置一个 delayTimeLevel 即可:

Message message = new Message("TestTopic",("Hello scheduled message" + i).getBytes());
message.setDelayTimeLevel(3);
producer.send(message);

Broker 收到这个消息后,一看 delayTimeLevel 设置了值,那么就知道它是一个延迟消息,于是乎直接来个偷梁换柱!

把消息的原 Topic 和对应队列 ID 保存在消息扩展属性里面。

然后把这条消息的 Topic 设置成 SCHEDULE_TOPIC_XXXX,没错 Topic 的名字就是 SCHEDULE_TOPIC_XXXX哈,后面就是 XXXX

并且根据消息的 Level 选择 SCHEDULE_TOPIC_XXXX 下对应的队列。

请添加图片描述

这样一来延迟消息就存储好了。

然后 Broker 起了一个定时线程池,里面一共有 18 个核心线程,这个线程池的任务就是定时调度 SCHEDULE_TOPIC_XXXX 下的每个队列的消息,一旦有到期的消息,就分发到原 Topic 供消费者消费。

具体的做法是在初始时,每个队列都会对应被创建一个任务扔到线程池中,这些任务的内容就是根据传入的队列 ID,得到对应的 consumeQueue,当然还有对应的 offset。

请添加图片描述

Broker 会定时保存 SCHEDULE_TOPIC_XXXX 下 consumeQueue 的消费 offset。

得到 consumeQueue 和 offset,对应的就能获取延时消息,这时候将延迟时间跟当前时间对比,就能判断是否到期。

如果到期了,就从消息扩展属性里面获取原 Topic 和对应队列 ID,然后投递到原队列中。

上面的图表就是这个意思,这里再贴一下:

请添加图片描述

然后再代码上的实现是立马新建一个任务扔到线程池中,延迟时间是 1000ms,任务的入参会塞入更新后的 offset,这样线程就会继续消费后面的消息,如此往复循环。

当然,如果拿到的对应延迟消息还未到时间,那么 offset 不变,也立马新建一个任务塞入到线程池中,这样 1000,s 后又会来看这个消息是否到期。

可以看到,整个延迟消息设计就加了一个线程池,很巧妙地复用了正常消息的 commitlog 和 consumeQueue 的存储机制,且利用发布订阅的特性,改变了消息的 Topic 来使得消费者无法消费到未到时间的消息。

到时间了又投递回原 Topic 使得消费者可以消费到到期的消息,非常 nice!

这样实现是否有什么问题?

从实现层面来看,大大减少了延迟消息开发的复杂度,但是这样的实现对延迟时间来说是不准的

首先,同一个延迟 level 的消息都是入同一个队列,然后上一个延迟消息处理完之后继续处理下一个,如果同一时刻有大量的同一个 level 的延迟消息产生,那么它们都堆积在一个队列里面,一个一个处理,这样一来即使后面的消息到时间了也得排队等着。

这样的机制就做不到非常实时。

并且从 SHEDULE_TOPIC_XXXX 分发至原 Topic 之后,假设原 Topic 本身就已经有很多消息堆积了,那么等消费者消费到这条消息的时候,时间也有大大的延迟。

请添加图片描述

当然,本身在大流量下对时间的把控是无法做到很准确的,不论是什么方法,都会有延迟,无非是延迟精度多少的问题。

有一种比较好的定时结构就是时间轮了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3281455.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

记录下Xjar部署问题

记录下 java Xjar部署问题 XjarXjar是什么?静态资源问题处理 项目是部署在客户端springboot项目,打包jar后静态资源都范文异常 net::ERR_CONTENT_LENGTH_MISMATCH 200 Xjar Xjar是什么? 无需侵入代码,只需要把编译好的JAR包通过…

springboot家校共育平台-计算机毕业设计源码54235

摘 要 采用高效的SpringBoot框架,家校共育平台为家长与教师提供了便捷的沟通渠道。该平台整合了丰富的教育资源,实现了家校之间的即时信息互通,从而助力协同教育。 为进一步方便用户访问和使用,平台与微信小程序进行了深度整合。家…

测试人生 | 招聘严峻期从面试无力感,到一天2个offer的一些经验分享(内附美团、字节、快手等面试题)

本人是霍格沃兹北京测试开发学社线下3期学员,拥有3年测试工作经验,之前一直在某大厂外包做软件测试,期间主要是以功能测试为主。 经过一个月的高强度找工作奋战,最终拿下了3家公司offer,选择了一家自己很满意的公司。…

4000元投影仪性价比之王:爱普生TW5750极米RS10还是当贝X5S?

买投影很多人会倾向于买大品牌或者是销量最好的那几款,首先是大品牌售后更有保障,口碑和销量也间接证明了这款投影是否值得买。这几年国内投影市场中爱普生、极米、当贝这三家投影品牌无论是在产品、口碑、售后服务等方面都是最好的,被用户们…

点对点的RPC通信功能测试(bug修复)

1.发现问题 处理rpc调用client客户端存在一些问题,数据反序列化的问题 rpc的调用方,数据的处理有些问题,我们返回的是true,应该是1,不是0. 返回值已经写道response里面。发回给调用方(calluserservice.&…

有向图的拓扑排序以及判断是否有环

拓扑序列是顶点活动网中将活动按发生的先后次序进行的一种排列。 拓扑排序,是对一个有向无环图(Directed Acyclic Graph简称DAG)G进行拓扑排序,是将G中所有顶点排成一个线性序列,使得图中任意一对顶点u和v,若边(u,v)∈E(G)&#x…

【数据分享】全国县市2000-2022年教育、卫生和社会保障数据(免费获取)

《中国县域统计年鉴》是一部全面反映我国县域社会经济发展状况的资料性年鉴,收录了上一年度全国2000多个县域单位的基本情况、综合经济、农业、工业、教育、卫生、社会保障等方面的资料。 在之前的文章中,我们分享过基于2001-2023年《中国县域统计年鉴》…

idea自定义模版、快捷键

原文地址&#xff1a;【IDEA】常用插件、设置、注释_idea注释插件-CSDN博客 创建模版组&#xff1a;MyTemplates 创建模版&#xff1a;forThread&#xff1a;循环打印出10个线程 第四步 for (int i 1; i < 10; i) {new Thread(() -> {$END$}, String.valueOf(i)).star…

pytorch-广播机制

Broadcasting Key idea A[4,3] B[3] 在第一个维度前面插入一个维度 [3] > [1,3]将维度1扩展到与B维度1一样的尺寸 [1,3] > [4,3] broadcasting unsqueeze expand 为什么要使用broadcasting&#xff1f; 1、for example [class, student, scores]add bias for ever…

ESP8266 8x8点阵LED控制系统 日志2024/7/31

手机app: 内置主页配置 唯一不好的就是有一点问题就得全改一遍,来回修改格式很烦啊喂!~ 为什么要留个 主页控制? 有些人不是喜欢程序员的浪漫嘛,把index.html上传上去下次就是表白页面咯! 当然这只是鸡肋娱乐,真实功能其实就是用来美化html的, 如果不满意html 自己美化之…

JAVA后端拉取gitee仓库代码项目并将该工程打包成jar包

公司当前有一个系统用于导出项目&#xff0c;而每次导出的项目并不可以直接使用&#xff0c;需要手动从gitee代码仓库中获取一个模板代码然后将他们整合到一起它才是一个完整的项目&#xff0c;所以目前我的任务就是编写一个java程序可以自动地从gitee仓库拉取下来那个模板代码…

git学习准备阶段

准备阶段 ubantu下载安装git sudo apt-get install git查看git版本 git -v注册用户名 git config --global user.name [name][name]填入自己的名字&#xff0c;如果没有空格的情况下&#xff0c;可以不加引号,–global是在全局下操作&#xff0c;如果没有这个参数就只是在本…

sdwan

分支互联网络解决方案 - 华为企业业务 分支互联网络解决方案 随着5G、AI、物联网等新兴技术与云紧密结合&#xff0c;企业业务智能化和云化加速。 企业分支WAN流量激增&#xff0c;传统以MPLS专线为主的广域互联网络难以支撑业务发展。SD-WAN成为应对云时代的必然选择。 SD…

2024电脑桌面能提醒的备忘录app分享

随着科技的飞速进步&#xff0c;2024年的今天&#xff0c;我们已经拥有了众多高效便捷的软件工具&#xff0c;其中&#xff0c;备忘录app更是成为了我们日常生活中不可或缺的一部分。在繁忙的工作和生活中&#xff0c;我们需要一个得力的助手来帮助我们记录重要事务&#xff0c…

【ROS 最简单教程 006/300】使用 launch 启动多个 ROS 节点

使用 launch 文件&#xff0c;可以一次性启动多个 ROS 节点 launch 文件编写的语法规则参见 &#x1f449; launch 文件编写 &#x1f49c; &#x1f49c; &#x1f49c; &#x1f49c; &#x1f49c; 简单示例如下 不使用 launch 需要启动三个命令行终端窗口&#xff0c;分别…

时常在面试中被问到的JVM问题

文章目录 JVM 和 JDK、JRE 有什么区别&#xff1f;JVM 是如何工作的&#xff1f;JVM 主要组件JVM 执行流程JVM 的工作示例 说一下类加载机制类加载器&#xff08;Class Loader&#xff09;示例 什么是双亲委派模型&#xff1f;&#xff08;Parent Delegation Model&#xff09;…

多语种语音合成数据,拓宽语音大模型边界

近期&#xff0c;一个名为 ChatTTS 的文本转语音项目爆火出圈&#xff0c;在 GitHub 上已经斩获了 28 k 的 Star 量。 作为一款专门为对话场景设计的语音生成模型&#xff0c;ChatTTS 支持英文和中文两种语言。针对对话式任务进行了优化&#xff0c;实现了自然流畅的语音合成。…

移动光猫(UNG853H)获取超级帐号和密码

1.查看光猫背部的登录地址及帐密码&#xff1b;比如我的光猫&#xff1a; http://192.168.1.1 User: user password: ****** 2.启动telnet服务&#xff0c;使用以下命令&#xff1a; http://192.168.1.1/webcmcc/telnet.html 3.使用telnet登录光猫&#xff0c;在CMD下执行&…

做微课的软件有哪些?教师专用录微课软件分享

在这个数字化教育时代&#xff0c;微课以其短小精悍、针对性强的特点&#xff0c;成为了教师们提升教学质量、促进学生自主学习的得力助手。制作高质量的微课&#xff0c;离不开一款功能强大、操作简便的录屏软件&#xff0c;今天&#xff0c;就让我们一起探索几款专为教师设计…

赢单!诸葛打造高效埋点体系,加速城商行营销效率

用户行为数据已成为银行了解客户需求、优化服务流程、提升营销效率的重要支持。某城商行作为一家具有前瞻性的金融机构&#xff0c;其现有的用户行为数据采集分析系统无法满足当下业务发展需求&#xff0c;用户数据的准确性、易用性和实效性亟待提升。 经过严格对诸葛智能埋点…