【初始RabbitMQ】高级发布确认的实现

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢

发布确认SpringBoot版本

确认机制方案

代码架构图

配置文件

spring.rabbitmq.host=118.31.6.132
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated

在配置文件当中需要添加 spring.rabbitmq.publisher-confirm-type=correlated

  • NONE 禁用发布确认模式,是默认值
  • CORRELATED 发布消息成功到交换器后会触发回调方法
  • SIMPLE 经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>springboot-rabbitmq</artifactId><version>0.0.1-SNAPSHOT</version><name>demo</name><description>Demo project for Spring Boot</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId><version>3.2.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

添加配置类

package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";//声明确认队列@Beanpublic Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认交换机@Beanpublic DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}//声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue")Queue queue,@Qualifier("confirmExchange")DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("key1");}
}

回调接口

package com.example.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id = correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}
}

消息生产者

package com.example.controller;import com.example.component.MyCallBack;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/confirm")
@Slf4j
public class producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyCallBack myCallBack;@PostConstruct //在类实例被创建后(通过依赖注入完成,并且在任何其他初始化方法之前),容器会自动调用这个方法public void init(){rabbitTemplate.setConfirmCallback(myCallBack);}@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message){//指定消息id为1CorrelationData correlationData1 = new CorrelationData("1");String routingKey = "key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);log.info("发送消息内容:{}",message+routingKey);CorrelationData correlationData2 = new CorrelationData("2");routingKey = "key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);log.info("发送消息内容:{}",message+routingKey);}
}

消息消费者

package com.example.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues = CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}

结果分析

访问:http://127.0.0.1:8080/confirm/sendMessage/%E4%BD%A0%E5%A5%BD

 

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为 第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条 消息被直接丢弃了 

回退消息

Mandatory参数

在仅开启了生产者确认机制的情况下,交换还击接收到消息之后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何 让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者

消息生产者代码

package com.example.controller;import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;@RestController
@Slf4j
@RequestMapping("/confirm")
public class MessageProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructprivate void init(){rabbitTemplate.setConfirmCallback(this);/*** true:交换机无法将消息进行路由时,会将消息返回给生产者* false:如果发现纤细无法进行路由时,直接的丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息给谁处理rabbitTemplate.setReturnsCallback(this);}@GetMapping("/sendMessage/{message}")public void sendMessage(@PathVariable String message){//让消息绑定一个id值CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("confirm.exchange","key1",message+"key1",correlationData1);log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("confirm.exchange","key2",message+"key2",correlationData2);log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");}@Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机收到消息确认成功, id:{}", id);} else {log.error("消息 id:{}未成功投递到交换机,原因是:{}", id, cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());}
}

 回调接口

package com.example.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData,boolean ack,String cause){String id = correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),returnedMessage.getExchange(), returnedMessage.getRoutingKey());}
}

 消息消费者

package com.example.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues = CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}

结果分析

备份交换机

备份交换机是 RabbitMQ 中的一个机制,用于处理无法被路由到队列的消息。它可以看作是交换机的“备胎”,当交换机接收到无法路由的消息时,会将这些消息转发到备份交换机中,由备份交换机来进行处理

通常情况下,备份交换机的类型是 Fanout,这意味着它会将所有接收到的消息广播给与其绑定的所有队列。因此,当备份交换机接收到无法路由的消息时,这些消息会被发送到与备份交换机绑定的队列中

通过设置备份交换机,我们可以将无法被路由的消息存储到一个特定的队列中,然后可以通过监控这个队列来进行报警或者手动处理。这样做既可以避免丢失消息,又不会增加生产者的复杂性,同时也提高了系统的稳定性和可靠性

代码架构图

 修改配置类

package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";//声明确认队列@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue")Queue queue,@Qualifier("confirmExchange")DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("key1");}//声明备份交换机@Beanpublic FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认交换机并且绑定备份交换机@Beanpublic DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备用交换机.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);return (DirectExchange) exchangeBuilder.build();}//声明警告队列@Beanpublic Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchangebackupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}
}

报警消费者

package com.example.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RestController;@RestController
@Slf4j
public class WarningConsumer {public static final String WARNING_QUEUE_NAME = "warning.queue";@RabbitListener(queues = WARNING_QUEUE_NAME)public void receiveWarningMsg(Message message) {String msg = new String(message.getBody());log.error("报警发现不可路由消息:{}", msg);}
}

 测试注意事项

直接运行会报错,因为我们之前创建过此队列,如要修改只能删除,或者新建队列

我们可以登录管理后台,删除该队列 

结果分析

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2804572.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

浅谈集群的分类

本文主要介绍集群部署相关的知识&#xff0c;介绍集群部署的基础&#xff0c;集群的分类、集群的负载均衡技术&#xff0c;集群的可用性以及集群的容错机制。随后介绍Redis-Cluster以及Mysql的架构以及主从复制原理。 集群介绍 单台服务器本身会受到带宽、内存、处理器等多方面…

万界星空科技电子机电行业MES系统,2000元/年起

电子行业在生产管理上具有典型的离散制造特点&#xff0c;采用多品种、多批量或单件的生产组织方式。产品升级换代迅速&#xff0c;生命周期短&#xff0c;变更频繁&#xff0c;版本控制复杂。 同时产品的种类较多&#xff0c;非标准产品多&#xff0c;加工工序复杂&#xff0…

使用MongoDB数据库和Mongoose库在Node.js中进行数据存储

在Node.js中使用MongoDB数据库和Mongoose库进行数据存储是前端开发中常用的技术之一。MongoDB是一种非关系型数据库&#xff0c;具有高性能、易扩展等优点&#xff1b;而Mongoose是在Node.js中对MongoDB进行操作的框架&#xff0c;简化了数据库操作&#xff0c;并提供了丰富的功…

Leetcode日记 2583. 二叉树中的第 K 大层和

Leetcode日记 2583. 二叉树中的第 K 大层和 题目&#xff1a;解题思路&#xff1a;代码实现制作不易&#xff0c;感谢三连&#xff0c;谢谢啦 题目&#xff1a; 给你一棵二叉树的根节点 root 和一个正整数 k 。 树中的 层和 是指 同一层 上节点值的总和。 返回树中第 k 大的层和…

QT常用类

五、常用类 QString 字符串类&#xff08;掌握&#xff09; QString是Qt的字符串类&#xff0c;与C的std::string相比&#xff0c; 不再使用ASCII编码。QString使用的是Unicode编码。 QString中每个字符都是一个16位的QChar&#xff0c;而不是8位的char。 QString完全支持中文&…

动态预测波动率:ARCH模型和Heston模型

制定符合需要的资产组合需要了解每支的波动率&#xff0c;波动率高的资产意味着价格波动大&#xff0c;风险高&#xff0c;为了降低资产组合的风险&#xff0c;通常会在波动率较低的资产中分配更多的资金。同时波动率也和市场参与者的情绪有关&#xff0c;波动率大&#xff0c;…

【算法与数据结构】684、685、LeetCode冗余连接I II

文章目录 一、684、冗余连接 I二、685、冗余连接 II三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、684、冗余连接 I 思路分析&#xff1a;题目给出一个无向有环图&#xff0c;要求去掉一个边以后构成一个树&#xf…

如何在VSCode中带有参数的Debug(name、program、$file、args、pickArgs、指定虚拟环境)

0. 省流 {"version": "0.2.0","configurations": [{"name": "调试train.py文件","type": "debugpy","request": "launch","program": "train.py","cons…

如何改变.net托管的入口main函数

有小伙伴问: .NET托管入口Main函数可以修改成别的函数&#xff0c;用来作为程序的入口吗&#xff1f; 答案&#xff1a;当然是可以的。这也算是.NET里面非常简单的骚操了。本篇来用最新的.NET8演示下&#xff0c;如何修改Main入口。 1.简单控制台例子&#xff1a; namespace…

美国硅谷大带宽服务器|大带宽服务器租赁贵吗?

在数字化时代&#xff0c;服务器成为了支撑各种在线业务和应用程序的重要基石。尤其对于那些需要处理大量数据、保证快速响应和稳定连接的企业或个人来说&#xff0c;大带宽服务器成为了不可或缺的选择。而美国硅谷&#xff0c;作为全球科技创新的摇篮&#xff0c;其服务器租赁…

Open CASCADE学习|绘制砂轮

今天绘制一个砂轮&#xff0c;其轮廓由两条直线段和两段圆弧构成&#xff0c;圆弧分别与直线相切&#xff0c;两条圆弧之间相交而非相切。建模思路是&#xff1a;先给定两条直线段的起始点及长度&#xff0c;画出直线段&#xff0c;然后给定其中一圆弧的半径及圆心角&#xff0…

python程序设计基础:字符串与正则表达式

第四章&#xff1a;字符串与正则表达式 4.1字符串 最早的字符串编码是美国标准信息交换码ASCII&#xff0c;仅对10个数字、26个大写英文字母、26个小写英文字母及一些其他符号进行了编码。ASCII码采用1个字节来对字符进行编码&#xff0c;最多只能表示256个符号。 随着信息技…

【新手易错点】golang中byte和rune

1 总体区别 在Golang中&#xff0c;byte和rune是两种不同类型的数据。简单来说&#xff0c;byte是一个8位的无符号整数类型&#xff0c;而rune则是一个32位的Unicode字符类型。 Byte: 在Golang中&#xff0c;byte类型实际上是uint8的别名&#xff0c;它用来表示8位的无符号整…

flutter使用getx实现路由跳转,页面没有执行dispose

我们看一下flutter的StatefulWidget组件的生命周期&#xff1a; createState&#xff1a; 当一个StatefulWidget插入到渲染树结构、或者从渲染树结构移除时&#xff0c;都会调用StatefulWidget.createState方法&#xff0c;从而达到更新UI的效果&#xff1b; initState&#…

【刷题记录】链表的回文结构

本系列博客为个人刷题思路分享&#xff0c;有需要借鉴即可。 1.题目链接&#xff1a; LINK 2.详解思路&#xff1a; 思路&#xff1a;思路&#xff1a;先找到中间节点&#xff0c;然后逆置后半部分链表&#xff0c;一个指针指向链表的头节点&#xff0c;再一个指针指向逆置的头…

怎么在wifi中实现手机和电脑文件互传

有时我们想手机电脑文件互传&#xff0c;数据线却不在身边&#xff0c;这时我们可以用MiXplorer来实现wifi中手机和电脑互相访问文件。 MiXplorer是一款来自著名安卓开发者论坛XDA的作品&#xff0c;免费且功能强大&#xff0c;被很多人誉为是“全能文件管理器”。 1.在手机上…

Golin 弱口令/漏洞/扫描/等保/基线核查的快速安全检查小工具

下载地址&#xff1a; 链接&#xff1a;https://pan.quark.cn/s/db6afba6de1f 主要功能 主机存活探测、漏洞扫描、子域名扫描、端口扫描、各类服务数据库爆破、poc扫描、xss扫描、webtitle探测、web指纹识别、web敏感信息泄露、web目录浏览、web文件下载、等保安全风险问题风险…

进程线程通信-day6

1、将信号和消息队列的课堂代码敲一遍 //发送端 #include<myhead.h>//定义一个消息结构类型 struct msgbuf {long mtype;char mtext[1024]; }; //定义一个宏&#xff0c;表示消息正文大小 #define MSGSIZE sizeof(struct msgbuf)-sizeof(long)int main(int argc, const…

CSS实现半边边框(只有边框的部分可见)

CSS实现半边边框&#xff08;只有边框的部分可见&#xff09; <div class"part box"><h1>内容</h1><!-- 绘出下面两个对角边框--><div class"part-footer"></div> </div>主要代码 .box {width: 100px;height:…

cmake 项目。qt5升级 qt6 报错 error: “Qt requires a C++17 compiler 已解决

日常项目开发中。需要对qt5升级到qt6 做cmake兼容配置&#xff0c;在编译中发现&#xff0c;有c 编译环境 报错 2>C:\Qt\6.5.3\msvc2019_64\include\QtCore/qcompilerdetection.h(1226,1): fatal error C1189: #error: "Qt requires a C17 compiler, and a suitable …