RocketMQ快速实战以及集群架构原理详解

RocketMQ快速实战以及集群架构原理详解

组成部分

  • 启动Rocket服务之前要先启动NameServer

image.png

  • NameServer
    • 提供轻量级Broker路由服务,主要是提供服务注册
  • Broker
    • 实际处理消息存储、转发等服务的核心组件
  • Producer
    • 消息生产者集群,通常为业务系统中的一个功能模块
  • Consumer
    • 消息消费者集群,通常是业务系统中的一个功能模块
  • Topic
    • 区分消息的种类,生产端可以发送消息给一个或多个topic,消费端可以进行一个或多个消息进行消费

集群中的角色

  • Producer
    • 消息发送者(寄信人),在生产者中会把同一类生产者组成一个集合,称之为生产者组,这类生产者发送同一类消息且发送逻辑一致,如果发送的是事务消息是原始生产者在发送之后崩溃,则Broker服务会联系同一生产组的其它生产者来提交或回溯消费
  • Consumer
    • 消息接受者(收信人),消费者同样会把一类消费者组成一个集合,称之为消费者组,这类消费者消费同一类消息且消费逻辑一致,消费者组在消息消费方面,实现负载均衡和容错非常容易,消费组中的消费者必须订阅相同的topic
    • RocketMQ支持两种消息模式
      • 集群消费模式
        • 相同消费者组下的每个消费者平摊消息
      • 广播消费模式
        • 相同消费者组的每个消费者接受全量消息
  • Broker Server
    • 暂存和传输消息(邮局),也存储消息相关的元数据信息(包括消费者组、消费进度偏移、主题、队列消息等),Broker Server是RocketMQ真正的业务核心
      • 子模块
        • Remoting Module
          • 整个Broker的实体,负责处理来自Client端的请求
        • Client Manager
          • 负责管理客户端以及维护消费者订阅的topic信息
        • Store Service
          • 提供方便简单的API接口处理消息存储到物理硬盘的查询功能
        • HA Service
          • 高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能
        • Index Service
          • 根据特定的Message Key对投递到Broker的消息进行索引服务,以提供消息的快速查询
      • Broker架构模式
        • 普通集群
          • 每个节点分配一个固定的角色,master负责响应客户端的写以及存储消息,slave只负责对master的消息进行同步以及响应客户端的读
            • 消息同步方式分为同步和异步
        • Dledger高可用集群
          • 基于Raft协议随机选举出一个master,而master挂了之后,会从slage中自动选举出一个节点作为新master
          • Dledger的职责
            • 接管Broker的Commitlog的消息存储
            • 从集群中选举出master节点
            • 完成master节点往slave节点的消息同步
  • Name Server
    • 管理Broker(邮局的管理机构),Broker Server启动时会向所有的Name Server注册自己的服务信息,并且后续通过心跳来保证服务信息的实时性,生产者或消费者可以通过名称服务查找各个topic响应的Broker IP列表,多个Name Server实例组成集群(AP模式),但相互独立,没有信息互换,意味着Name Server中任意的节点挂了,只要有一台服务节点正常,整个路由服务不会受影响
  • Topic
    • 区分消息的种类,一个发送者可以发送消息给一个或多个topic,一个消息的接受者可以订阅一个或多个topic消息,同一个topic下的数据,会分片存储到不同的Broker上,而每一个分片单位MessageQueue(类似于Kafka中的Partition)
  • MessageQueue
    • 相当于Topic中的分区,用于并行发送和接收消息,每个Topic默认有4个MessageQueue

消息确认机制

  • 消息生产端采用消息确认多次重试的机制来保证消息能发送到MQ

    • 3种发送消息的方式

      • 同步发送

        • 必须等到Broker反馈之后才能继续发,安全性最高但发消息最慢
      • 单向发送

        • 不管消息是否发成功都能继续发,所以吞吐量最高,但是安全性低,容易丢消息
      • 异步发送

        • 发送消息的同时回注册一个回调去处理响应,安全性低,容易丢消息
  • 消息消费者端采⽤状态确认机制保证消费者⼀定能正常处理对应的消息

    • Broker会通过记录重试次数,为了不影响topic下其它正常的消息,会给每个消费组设计对应的重试topic,在消息重试时,会将原topic的消息移动到对应的重试topic中去,当重试达到一定阈值会将失败的消息推入到死信topic中
    • 消费者组由多个消费者实例组成,Broker只需要向某一个实例推送消息即可,保障消息重试机制正常运行,并且Broker只通过消费者返回的状态来判断是否处理成功,但是业务执行是否正确是无法知道的
  • 消费者也可以⾃⾏指定起始消费位点

    • Broker通过消费者返回的状态来推进消费者组对应的消息offset,虽然offset是Broker来维护,但是消费者可以自己指定offset进行消费

消息模型

顺序消息

  • 只能保证局部消息有序,不能保证全局有序,要保证全局有序需要从生产端、Broker、消费端三个角度同时考虑才行
    • 生产端
      • 默认情况下,生产端采用轮询将消息投递到不同的MessageQueue种,而消费端会从多个MessageQueue中拉取消息,所以这种情况下是无法保证顺序的,所以只有让一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这组消息有序
    • Broker
      • Broker中的一个MessageQueue是可以保证有序的
    • 消费端
      • 消费端会从多个MessageQueue上拉取消息,此时每个MessageQueue的消息是有序的,但是多个MessageQueue直接混合到一起却是乱序的,如果想要保证消费有序,可以通过锁MessageQueue的方式,消费完一个MessageQueue再去消费下一个来保证
        • MessageListenerOrderly会锁队列,取完一个才能下一个
        • MessageListenerConcurrently不会锁队列,每次从多个MessageQueue取出一批数据(默认不超过32条)
  • 实现思路简概
    • 生产者只有将一批有序的消息放到同一个MessageQueue上,Broker才有可能保持这一批消息的顺序
    • 消费者只有一次锁定一个MessageQueue,拿到MessageQueue上消息
  • 注意点
    • 大部分业务场景下只要保证局部有序,如果要保证全局有序,只能保留一个MessageQueue,性能较低
    • 生产者端尽可能将有序消息打散到不同的MessageQueue上,避免数据热点竞争
    • 消费者端只能使用同步方式处理消息,不要使用异步处理,更不能自行批量处理
    • 消费者端只进行有限次数的重试,如果一条消息处理失败,RocketMQ会将后续消息阻塞,让消费者进行重试,但是如果消费者一直处理失败,超过最大重试次数,RocketMQ会跳过这条消息,直接处理后面的消息,导致消费乱序
    • 消费者端如果处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代

广播消息

  • 广播消息并没有特定的消费者,因为这涉及到消费者的集群消费模式,默认是集群模式
  • 实现思路简概
    • 集群模式
      • 一个消息只会被一个消费组中的多个消费者共同处理一次(平摊)
        • Broker端会给每个消费者组维护一个统一的offset来保证同一个消费组内只会被消费一次
    • 广播模式
      • 一个消息会被推送给所有消费者消费,不再关心消费组
        • Broker端只管推消息,消费端自己维护offset
  • 注意点
    • Broker端不维护消费进度,如果消费者处理消息失败了,将⽆法进⾏消息重试
    • 消费端自己维护offset可以在服务重启后继续之前的进度,消息丢了也不影响服务稳定性

延迟消息

  • RocketMQ给消息定制了18个默认的延迟级别,延迟消息的难点其实是性能,需要不断进⾏定时轮询,全部扫描所有消息是不可能的
  • 实现思路简概
    • RocketMQ预设一个系统topic(SCHEDULE_TOPIC_XXXX),在这个topic下有18个延迟队列,每次只针对这些队列里的消息进行延迟操作
  • 注意点
    • 预设延迟时间导致不灵活,后续版本已经支持预设一个具体的时间戳,不建议调整延迟级别对应的延迟时间

批量消息

  • 生产者端发送的消息过多时,可以将多条消息合并进行批量发送,减少网络IO,提升消息发送的吞吐量

  • 注意点

    • 只能对同一topic下的消息进行批量发送,不支持延迟消息,以及批量消息的大小不超过1MB(超过了自行拆分)

过滤消息

  • 同一topic下不同的消息,消费者只关注某一类消息,有简单过滤和SQL过滤方式
  • 实现思路简概
    • 简单过滤
      • ⽣产者端需要在发送消息时,增加Tag属性,消费者端/Broker端就可以通过这个Tag属性过滤出需要的消息
    • SQL过滤
      • ⽣产者端需要在发送消息时,增加Tag属性以及自定义的属性,消费者端/Broker端可以指定一个SQL进行过滤
  • 注意点
    • 消息过滤在消费者端和Broker端都可以做,消费者端进行过滤可以保障消息过滤的可控性,而Broker端过滤可以减少不必要数数据网络IO(只把消费者端需要的消息发送出去就行)
    • 在实际生产中,被过滤的消息并不会直接丢弃,会交给其它需要的消费者进行消费,如果一直没有消费者进行消费,Broker端会继续推进offset

事务消息

  • 通过RocketMQ的事务机制,来保障本地事务(比如数据库)与MQ消息发送的事务一致性(上下游的数据一致性)

  • 实现思路简概

    • 生产者端将消息发送到MQ服务端
    • MQ服务端将消息持久化成功之后,向生产者端反馈已发送成功,此时消息处于半事务消息状态(暂不能投递)
    • 生产者端开始执行本地事务逻辑
    • 生产者端根据本地事务执行结果向MQ服务端提交二次确认结果来判断是否提交或回滚
      • 提交
        • 服务端将半事务消息标记为可投递,将半事务消息转交给消费端
      • 回滚
        • 服务端将回滚事务,放弃将半事务消息转交给消费端
    • 当出现网络故障或生产者端重启时,若果MQ服务端未收到二次确认消息结果或收到的结果为未知状态,经过一定时间后,MQ服务端将对生产者组的任一生产者发送消息回查,生产者收到消息回查后,需要检查对应消息的本地事务执行最终结果,然后生产者端根据检查到的最终结果再次提交二次确认来判断是否提交或回滚
  • 注意点

    • 半消息是对消费者不可⻅的⼀种消息,RocketMQ的做法是将消息转到了⼀个系统Topic(RMQ_SYS_TRANS_HALF_TOPIC)
    • 事务消息中,本地事务默认回查次数15次,本地事务回查的默认间隔60秒,超过回查次数后,消息将会被丢弃
    • 事务消息不支持延迟消息和批量消息

最佳实战注意点

  • 合理分配Topic、Tag
    • ⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识,tags可以由应⽤⾃由设置,只有⽣产者在发送消息设置了tags,消费⽅在订阅消息时才可以利⽤tags通过broker做消息过滤
  • 使⽤Key加快消息索引
    • 分配好Topic和Tag之后,⾃然就需要优化Key属性了,因为Key也可以参与消息过滤,通常建议每个消息要分配 ⼀个在业务层⾯的唯⼀标识码,设置到Key属性中
      • 作用
        • 可以配合Tag进⾏更精确的消息过滤
        • Broker端会为每个消息创建⼀个hash索引,应⽤可以通过topic、key来查询某⼀条历史的消息内容,以及消息在集群内的处理情况,为了避免哈希冲突问题,客户端要尽量保证key的唯⼀性
  • 关注错误消息重试
    • RocketMQ消费者端,如果处理消息失败了,Broker是会将消息重新进⾏投送,⽽在重试时,每个消费者组创建⼀个对应的重试队列(“%RETRY%”+ConsumeGroup),多关注重试队列,可以及时了解消费者端的运⾏情况,如果这个队列中出现了⼤量的消息,就意味着消费者的运⾏出现了问题,要及时跟踪进⾏⼲预
    • RocketMQ默认允许每条消息最多重试16次(可以定制),如果消息重试16次后仍然失败,消息将不再投递。转为进⼊死信队列
  • ⼿动处理死信队列
    • 当⼀条消息消费失败,RocketMQ就会⾃动进⾏消息重试。⽽如果消息超过最⼤重试次数,RocketMQ就会认为这个消息有问题,RocketMQ不会⽴刻将这个有问题的消息丢弃,⽽会将其发送到这个消费者组对应的死信队列,此时需要人工去查看死信队列(%DLQ%+ConsumGroup)中的消息,对错误原因进行排查以及对死信进行处理(转发到正常的tipic进行重新消费或者丢弃)
    • 死信队列的特征
      • ⼀个死信队列对应⼀个ConsumGroup,⽽不是对应某个消费者实例
      • 如果⼀个ConsumeGroup没有产⽣死信,RocketMQ就不会为其创建相应的死信队列
      • 死信队列中的消息不会再被消费者正常消费
      • 死信队列的有效期跟正常消息相同,默认3天(可配置),超过这个最⻓时间的消息都会被删除,⽽不管消息是否消费过
  • 消费者端进⾏幂等控制
    • 在MQ系统中,对于消息幂等有三种实现语义
      • at most once 最多⼀次:每条消息最多只会被消费⼀次
        • 可以⽤异步发送、sendOneWay等⽅式就可以保证
      • at least once ⾄少⼀次:每条消息⾄少会被消费⼀次
        • 可以⽤同步发送、事务消息等很多⽅式能够保证
      • exactly once 刚刚好⼀次:每条消息都只会确定的消费⼀次
        • RocketMQ只能保证at least once,保证不了exactly once
          • 云上版本支持
    • 消息幂等的必要性
      • 出现重复的情况
        • 发送时消息重复
          • 当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端
            应答失败, 如果此时⽣产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且消息ID也相同的消息
        • 投递时消息重复
          • 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断,为
            了保证消息⾄少被消费⼀次,Broker端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且消息ID也相同的消息
        • 负载均衡时消息重复(不限于⽹络抖动、Broker 重启以及订阅⽅应⽤重启)
          • 当 Broke端 或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息
      • 处理方式
        • 在RocketMQ中,是⽆法保证每个消息只被投递⼀次的,所以要在业务上⾃⾏来保证消息消费的幂等性,RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据,但是最好使用分布式ID来避免出现冲突

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2804750.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

xff注入 [CISCN2019 华东南赛区]Web111

打开题目 看见smarty 想到模板注入 又看见ip 想到xff注入 一般情况下输入{$smarty.version}就可以看到返回的smarty的版本号。该题目的Smarty版本是3.1.30 在Smarty3的官方手册里有以下描述: Smarty已经废弃{php}标签,强烈建议不要使用。在Smarty 3.1&#xff…

07 MyBatis之高级映射 + 懒加载(延迟加载)+缓存

1. 高级映射 例如有两张表, 分别为班级表和学生表 自然, 一个班级对应多个学生 像这种数据 , 应该如果如何映射到Java的实体类上呢? 这就是高级映射解决的问题 以班级和学生为例子 , 因为一个班级对应多个学生 , 因此学生表中必定有一个班级编号字段cid 但我们在学生的实体…

HarmonyOS学习--三方库

文章目录 一、三方库获取二、常用的三方库1. UI库:2. 网络库:3. 动画库: 三、使用开源三方库1. 安装与卸载2. 使用 四、问题解决1. zsh: command not found: ohpm 一、三方库获取 在Gitee网站中获取 搜索OpenHarmony-TPC仓库,在t…

Day20_网络编程(软件结构,网络编程三要素,UDP网络编程,TCP网络编程)

文章目录 Day20 网络编程学习目标1 软件结构2 网络编程三要素2.1 IP地址和域名1、IP地址2、域名3、InetAddress类 2.2 端口号2.3 网络通信协议1、OSI参考模型和TCP/IP参考模型2、UDP协议3、TCP协议 2.4 Socket编程 3 UDP网络编程3.1 DatagramSocket和DatagramPacket1、Datagram…

强大的文本绘图——PlantUML

PlantUML是一款开源工具,它允许用户通过简单的文本描述来创建UML图(统一建模语言图)。这种方法可以快速地绘制类图、用例图、序列图、状态图、活动图、组件图和部署图等UML图表。PlantUML使用一种领域特定语言(DSL)&am…

PostMan使用自带js库base64编码、sha256摘要、环境变量的使用

目录 1、环境变量的使用2、base64编码、sha256摘要、以及脚本的使用3、脚本代码 在请求调试接口的过程中,因为要使用大量相同的参数,使用变量的方式能很大程度上减轻接口调用的工作量 版本说明:Postman for Windows,Version&#…

BUGKU-WEB 备份是个好习惯

题目描述 题目截图如下: 进入场景看看: 解题思路 看源码看提示:备份是个好习惯扫描目录md5弱比较 相关工具 御剑md5解密:https://www.somd5.com/ 解题步骤 看到的这串字符,有点像md5? d41d8cd98…

【PCL】(十二)使用ConditionalRemoval或RadiusOutlierRemoval滤波器对点云进行滤波

(十二)使用ConditionalRemoval 或 RadiusOutlierRemoval滤波器对点云进行滤波 RadiusOutlierRemove滤波器删除PointCloud中在指定半径的邻域范内,邻点没能达到指定数量的点。下图中,如果指定了邻点数为1,则黄色点将从…

亿道丨三防平板电脑厂家推荐丨三防平板PAN智能化赋能

随着科技的不断进步和人们对智能化产品的需求日益增长,三防平板迎来了智能化赋能的时代。通过融合创新科技,三防平板实现了更高的性能、更智能的功能以及更广泛的应用场景,引领着未来的发展潮流。 一、智能化技术提升性能 随着技术的进步&…

Nginx网络服务四-----日志、Nginx压缩和ssl

1.自定义访问日志 如果访问出错---404,可以去看error.log日志信息 访问日志是记录客户端即用户的具体请求内容信息,而在全局配置模块中的error_log是记录nginx服务器运行时的日志保存路径和记录日志的level,因此两者是不同的,而且…

openssl 生成nginx自签名的证书

1、命令介绍 openssl req命令主要的功能有,生成证书请求文件, 查看验证证书请求文件,还有就是生成自签名证书。 主要参数 主要命令选项: -new :说明生成证书请求文件 -x509 :说明生成自签名证书 -key :指定已…

计算机网络面经-从浏览器地址栏输入 url 到显示主页的过程?

大概的过程比较简单,但是有很多点可以细挖:DNS解析、TCP三次握手、HTTP报文格式、TCP四次挥手等等。 DNS 解析:将域名解析成对应的 IP 地址。TCP连接:与服务器通过三次握手,建立 TCP 连接向服务器发送 HTTP 请求服务器…

JS基础(一)

一 JS概述 1. 历史 1995年 JS出现在网景浏览器中 1996年 IE也开始出现了JS 1997年 指定了JS的标准规范ECMAScript 目前是ES6 2009年 JS开始向后端发展&#xff0c;出现了Node.js 二 JS入门 1. JS的运行方式 1. 内部写法 <script>J…

查看navicat保存的数据库连接密码

背景 经常使用navicat的朋友可能会碰到忘记数据库连接密码的情况&#xff0c;自然会想到navicat连接配置中就保存了密码。 个人经验&#xff0c;按以下步骤可查看密码明文 本人在mac上使用的navicat版本 1&#xff0c;导出connection_local.ncx 点击OK导出保存为connection_l…

iOS面试:4.多线程GCD

一、多线程基础知识 1.1 什么是进程&#xff1f; 进程是指在系统中正在运行的一个应用程序。对于电脑而已&#xff0c;你打开一个软件&#xff0c;就相当于开启了一个进程。对于手机而已&#xff0c;你打开了一个APP&#xff0c;就相当于开启了一个进程。 1.2 什么是线程&am…

【电子书】人工智能

资料 wx&#xff1a;1945423050&#xff0c;备注来源和目的 个人整理了一些互联网电子书 人工智能 Julia机器学习核心编程&#xff1a;人人可用的高性能科学计算.epubKeras深度学习实战.epubMATLAB图像与视频处理实用案例详解.epubMATLAB金融算法分析实战&#xff1a;基于机器…

Java学习笔记2024/2/23

今日内容 抽象类 接口 内部类 教学目标 能够写出抽象类的格式能够写出抽象方法的格式能说出抽象类的应用场景写出定义接口的格式写出实现接口的格式说出接口中成员的特点能说出接口的应用场景能说出接口中为什么会出现带有方法体的方法能完成适配器设计模式 第一章 抽象类…

c语言--typedef关键字

目录 一、typedef关键字是用来干嘛的&#xff1f;二、用法 一、typedef关键字是用来干嘛的&#xff1f; typedef 是⽤来类型重命名的&#xff0c;可以将复杂的类型&#xff0c;简单化。 二、用法 比如&#xff0c;你觉得 unsigned int 写起来不方便&#xff0c;如果能写成 u…

[网鼎杯 2020 白虎组]PicDown

网鼎杯的&#xff0c;这应该是送分的那种 界面很普通&#xff0c;就长这样。源代码也没什么&#xff0c;随便输入试试 出现了"/page?url1" 这可能是ssrf题目。 但是尝试了一些payload发现下载了一张图片&#xff0c;并且url里自动补齐了127.0.0.1。使用记事本打开…

MySQL数据库基础(十三):关系型数据库三范式介绍

文章目录 关系型数据库三范式介绍 一、什么是三范式 二、数据冗余 三、范式的划分 四、一范式 五、二范式 六、三范式 七、总结 关系型数据库三范式介绍 一、什么是三范式 设计关系数据库时&#xff0c;遵从不同的规范要求&#xff0c;设计出合理的关系型数据库&…