RocketMQ入坑指南(五):SpringBoot集成RocketMQ和具体使用方式

前言

经过前面几部分的教程,大家应该已经对RocketMQ有了一个全面的认识,建议仔细阅读前几章的内容,可以更好的理解这次的内容,接下来,我们通过代码来演示一下SpringBoot如何集成并使用RocketMQ发送消息

一、SpringBoot集成RocketMQ

集成很简单,只需要导入RocketMQ的starter包即可

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version>
</dependency>
// 工具类方便打印信息
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.12</version>
</dependency>

在配置文件中配置相关参数

生产者

# 应用服务 WEB 访问端口
server:port: 8088
rocketmq:
# 多个nameserver用分号隔开name-server: 192.168.0.8:9876#生产者producer:# 生产者组名group: producer_group# 异步发送消息的重试次数retry-times-when-send-async-failed: 5

消费者:

# 应用服务 WEB 访问端口
server:port: 8081
rocketmq:
# 多个nameserver用分号隔开name-server: 192.168.0.8:9876# 消费者consumer:# 组名group: consumer_group# 消息模式 集群模式message-model: CLUSTERING
spring:application:name: consumer

二、发送普通消息

1.同步发送消息

同步消息是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。

2.异步发送消息

异步消息是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。RocketMQ 异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。

3.单步发送消息

发送⽅只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。

具体代码如下:

生产者:

import cn.hutool.core.date.DateUtil;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.Date;@RestController
@RequestMapping("/rocketmq")
public class ProducerController {@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** 发送普通消息* @return*/@RequestMapping("/testNormalMessage")public String testNormalMessage() throws Exception{//发送10条消息for (int i = 1; i <= 10; i++) {String msg = "这是第"+i+"个订单";//普通同步消息rocketMQTemplate.syncSend("TopicOrder",MessageBuilder.withPayload("同步消息:"+msg));//普通异步消息rocketMQTemplate.asyncSend("TopicOrder", MessageBuilder.withPayload("异步消息:"+msg), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功:"+sendResult.toString());}@Overridepublic void onException(Throwable throwable) {System.out.println("发生异常:"+throwable.toString());}});//单步发送rocketMQTemplate.sendOneWay("TopicOrder",MessageBuilder.withPayload("单步发送消息:"+msg));Thread.sleep(1000);}return "";}}

消费者:

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@RocketMQMessageListener(topic = "TopicOrder",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "*")
@Component
public class ConsumerNormal implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {try {System.out.println("收到信息:"+new String(messageExt.getBody()));} catch (Exception e) {throw new RuntimeException(e);}}
}

结果:

可以看到消费者收到了生产者发送的消息

三、发送顺序消息

生产者:

 /*** 发送顺序通知* @return*/@RequestMapping("/testSyncOrderMessage")public String testSyncOrderMessage() throws Exception{//发送10个订单,每个订单发送3条记录数据for (int i = 1; i <= 10; i++) {for (int j = 1; j <= 3; j++) {String msg = "这是第"+i+"个订单";switch (j){case 1:msg += ",用于生成订单";break;case 2:msg += ",用来生成支付记录";break;case 3:msg += ",用来生成物流记录";break;}//自定义队列选择器
//                rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
//                    @Override
//                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//                        return list.get(o.hashCode() % 2);
//                    }
//                });SendResult topicOrderOrderly = rocketMQTemplate.syncSendOrderly("TopicOrder_Orderly", msg, i + "");System.out.println("发送信息:"+topicOrderOrderly.toString());Thread.sleep(1000);}}return "";}

消费者:

@RocketMQMessageListener(topic = "TopicOrder_Orderly",consumerGroup = "orderly_group",consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderly implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {try {System.out.println("收到信息:队列ID:"+messageExt.getQueueId()+" 内容:"+new String(messageExt.getBody()));} catch (Exception e) {throw new RuntimeException(e);}}
}

我们可以启动两个消费者,观察多个消费者消费的数据情况,是否相同队列的数据由同一个消费者进行处理

结果:

可以从结果看出,相同订单号的订单由一个消费者按顺序消费,默认4个队列,两个消费者各承担了两个队列的消息

四、发送延迟消息

延迟等级前面的教程中介绍过,下面贴出来帮大家回忆一下

代码如下:

生产者:

/*** 发送延迟消息* @return*/@RequestMapping("/testDelayMessage")public String testDelayMessage() throws Exception{//发送延迟消息SendResult sendResult = rocketMQTemplate.syncSend("TopicDelay", //TopicMessageBuilder.withPayload("这是一条延迟10S的消息").build(), //发送的消息主体3000, //timeout时间3   //延迟等级,对应16个默认延迟时间);System.out.println(sendResult.toString());System.out.println(DateUtil.formatTime(new Date()));return "";}

消费者:

@RocketMQMessageListener(topic = "TopicDelay",consumerGroup = "delay_group",selectorExpression = "*")
@Component
public class ConsumerDelay implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {try {System.out.println("收到信息"+messageExt.toString());System.out.println(DateUtil.formatTime(new Date()));} catch (Exception e) {throw new RuntimeException(e);}}
}

结果:

生产者:

消费者:

五、发送事务消息(重要)

事务消息的整体逻辑跟前面章节介绍的方式相同,现在发送3条模拟消息分别代表

1.模拟成功消息

2.模拟二次确认失败消息

3.模拟二次确认结果未知,主动查询消息之后返回成功

预想的结果应该是第一和第三种可以成功消费

下面是测试代码:

1.新增生产者监听类,用于二次确认事务处理,和提供主动查询逻辑

import cn.hutool.core.date.DateUtil;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import java.nio.charset.StandardCharsets;
import java.util.Date;@RocketMQTransactionListener
public class OrderTransMsgListener implements RocketMQLocalTransactionListener {// 事务消息共有三种状态,提交状态、回滚状态、中间状态:// RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。// RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。// RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {if("commit".equals(o)){System.out.println("二次确认成功:"+new String((byte[])message.getPayload(), StandardCharsets.UTF_8));return RocketMQLocalTransactionState.COMMIT;}if("rollback".equals(o)){System.out.println("二次确认失败:"+new String((byte[])message.getPayload(), StandardCharsets.UTF_8));return RocketMQLocalTransactionState.ROLLBACK;}if("unknown".equals(o)){System.out.println("二次确认未知:"+new String((byte[])message.getPayload(), StandardCharsets.UTF_8));return RocketMQLocalTransactionState.UNKNOWN;}return null;}/*** 检查本地事务的状态* 回查间隔时间:系统默认每隔60秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。* 第一次消息回查最快*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {System.out.println("收到事务查询回调的时间:" + DateUtil.formatDateTime(new Date()));System.out.println("信息内容:"+new String((byte[])message.getPayload(), StandardCharsets.UTF_8));return RocketMQLocalTransactionState.COMMIT;}
}

2.生产者

/*** 发送事务通知* @return*/@RequestMapping("/testTransMessage")public String testTransMessage() throws Exception{//模拟正常提交消息SendResult sendResult1 = rocketMQTemplate.sendMessageInTransaction("TopicTrans",MessageBuilder.withPayload("这是正常的消息").build(),"commit");System.out.println(sendResult1.toString());//模拟需要回滚的消息SendResult sendResult2 = rocketMQTemplate.sendMessageInTransaction("TopicTrans",MessageBuilder.withPayload("这是错误的消息").build(),"rollback");System.out.println(sendResult2.toString());//模拟生产者发送成功,二次确认失败,broker主动查询成功的情况SendResult sendResult3 = rocketMQTemplate.sendMessageInTransaction("TopicTrans",MessageBuilder.withPayload("这是二次确认失败,主动查询成功的消息").build(),"unknown");System.out.println(sendResult3.toString());return "";}

消费者:

@RocketMQMessageListener(topic = "TopicTrans",consumerGroup = "trans_group")
@Component
public class ConsumerTrans implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {try {System.out.println("收到信息:"+new String(messageExt.getBody())+"时间:"+ DateUtil.formatDateTime(new Date()));} catch (Exception e) {throw new RuntimeException(e);}}
}

然后我们执行发送信息,看一下结果:

生产者:

消费者:

和我们预料的结果相同。

本章内容java源码

 关注我,不迷路

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2815228.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Linux笔记--用户与用户组

Linux系统是一个多用户多任务的操作系统&#xff0c;任何一个要使用系统资源的用户&#xff0c;都必须首先向系统管理员(root)申请一个账号&#xff0c;然后以这个账号的身份进入系统。 用户的账号一方面可以帮助系统管理员对使用系统的用户进行跟踪&#xff0c;并控制他们对系…

Unity中URP下实现水体(水面反射)

文章目录 前言一、原理1、法一&#xff1a;使用立方体纹理 CubeMap&#xff0c;作为反射纹理使用2、法二&#xff1a;使用反射探针生成环境反射图&#xff0c;所谓反射的采样纹理 二、实现水面反射1、定义和申明CubeMap2、反射向量需要什么3、计算 N ⃗ \vec{N} N 4、计算 V ⃗…

【C++私房菜】序列式容器的迭代器失效问题

目录 一、list的迭代器失效 二、vector的迭代器失效 1、空间缩小操作 2、空间扩大操作 三、总结 在C中&#xff0c;当对容器进行插入或删除操作时&#xff0c;可能会导致迭代器失效的问题。所谓迭代器失效指的是&#xff0c;原先指向容器中某个元素的迭代器&#xff0c;在…

STM32_DS18B20_1_芯片简介及初始化配置

DS18B20介绍 DS18B20数字温度计提供9位到12位摄氏度的温度测量&#xff0c;并具有非易失性&#xff0c;用户可编程的上下触发点的报警功能。DS18B20通过1线总线进行通信&#xff0c;根据定义&#xff0c;该总线只需要一条数据线&#xff0c;即可与中央微处理器进行通信…

5G双域快网

目录 一、业务场景 二、三类技术方案 2.1、专用DNN方案 2.2、ULCL方案&#xff1a;通用/专用DNNULCL分流 2.3、 多DNN方案-定制终端无感分流方案 漫游场景 一、业务场景 初期双域专网业务可划分为三类业务场景&#xff0c;学校、政务、文旅等行业均已提出公/专网融合访问需…

每日五道java面试题之spring篇(九)

目录&#xff1a; 第一题. 说一下Spring的事务传播行为第二题. 说一下 spring 的事务隔离&#xff1f;第三题. Spring AOP and AspectJ AOP 有什么区别&#xff1f;AOP 有哪些实现方式&#xff1f;第四题. JDK动态代理和CGLIB动态代理的区别第五题. 解释一下Spring AOP里面的几…

nginx实现http反向代理及负载均衡

目录 一、代理概述 1、代理概念 1.1 正向代理&#xff08;Forward Proxy&#xff09; 1.2 反向代理&#xff08;Reverse Proxy&#xff09; 1.3 正向代理与反向代理的区别 2、同构代理与异构代理 2.1 同构代理 2.2 异构代理 2.3 同构代理与异构代理的区别 二、四层代…

VL817-Q7 USB3.0 HUB芯片 适用于扩展坞 工控机 显示器

VL817-Q7 USB3.1 GEN1 HUB芯片 VL817-Q7 USB3.1 GEN1 HUB芯片 VIA Lab的VL817是一款现代USB 3.1 Gen 1集线器控制器&#xff0c;具有优化的成本结构和完全符合USB标准3.1 Gen 1规范&#xff0c;包括ecn和2017年1月的合规性测试更新。VL817提供双端口和双端口4端口配置&…

Linux NFC 子系统剖析

1.总览 linux源码中NFC在net/nfc下&#xff0c;文件结构如下图&#xff1a; hci&#xff1a;Host Controller Interface 主要是针对NFC的主机-控制器接口协议 nci&#xff1a;NFC Controller Interface 主要是NFC的控制器接口协议&#xff0c;用于NFCC(NFC Controller)和DH(…

【Go语言】Go语言中的切片

Go语言中的切片 1.切片的定义 Go语言中&#xff0c;切片是一个新的数据类型数据类型&#xff0c;与数组最大的区别在于&#xff0c;切片的类型中只有数据元素的类型&#xff0c;而没有长度&#xff1a; var slice []string []string{"a", "b", "c…

GCC的符号可见性: 解决Linux多个库同名符号冲突问题以及引用不同版本库的问题

目录 1 -fvisibilitydefault|internal|hidden|protected 1.1 __attribute__((visibility("default"))) 与 CXXg -fvisibilityhidden 的作用 1.2 __attribute__((visibility("hidden"))) 与 CXXg -fvisibilitydefault的作用 2 我的问题 2.1 解决措…

雾锁王国服务器怎么建?雾锁王国服务器搭建方法

雾锁王国Enshrouded服务器搭建怎么搭建&#xff1f;非常简单&#xff0c;阿里云计算巢雾锁王国程序&#xff0c;可以一键搭建雾锁王国多人联机服务器&#xff0c;腾讯云是基于雾锁王国镜像系统&#xff0c;阿里云服务网aliyunfuwuqi.com汇总雾锁王国服务器搭建&#xff0c;超简…

kafka三节点集群平滑升级过程指导

一、前言 Apache Kafka作为常用的开源分布式流媒体平台&#xff0c;可以实时发布、订阅、存储和处理数据流,多用于作为消息队列获取实时数据&#xff0c;构建对数据流的变化进行实时反应的应用程序&#xff0c;已被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型…

DolphinScheduler——工作流实例的生命周期

目录 一、DolphinScheduler架构原理 1.1 系统架构图 1.2 DolphinScheduler核心概念 1.2 创建工作流 1.2.1 如何触发一个工作流实例 1.2.2 任务调度链路监控 1.2.3 Workflow-DAG解析 DAG解析 Dispatch分发流程 Master和Worker的交互过程 1.3 任务运行状态 该篇文章主…

就业班 2401--2.28 Linux Day7--存储管理1

一 .存储管理 主要知识点: 基本分区、逻辑卷LVM、EXT3/4/XFS文件系统、RAID 初识硬盘 机械 HDD 固态 SSD SSD的优势 SSD采用电子存储介质进行数据存储和读取的一种技术&#xff0c;拥有极高的存储性能&#xff0c;被认为是存储技术发展的未来新星。 与传统硬盘相比&#…

Codeforces Round 929 (Div. 3)(A,B,C,D,E,F,G)

这场没考什么算法&#xff0c;比较水&#xff0c;难度也不是很高。比赛链接 硬要说的话E有个 前缀和 加 二分&#xff0c;F是数学BFS&#xff0c;G是个构造 A. Turtle Puzzle: Rearrange and Negate 题意&#xff1a; 给你一个由 n n n 个整数组成的数组 a a a 。您必须对…

Rocky Linux 运维工具yum

一、yum的简介 ​​yum​是用于在基于RPM包管理系统的包管理工具。用户可以通过 ​yum​来搜索、安装、更新和删除软件包&#xff0c;自动处理依赖关系&#xff0c;方便快捷地管理系统上的软件。 二、yum的参数说明 1、install 用于在系统的上安装一个或多个软件包 2、seach 用…

golang使用gorm操作mysql1

1.mysql连接配置 package daoimport ("fmt""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/gorm/logger" )var DB *gorm.DB// 连接数据库&#xff0c;启动服务的时候&#xff0c;init方法就会执行 func init() {username : "roo…

stm32学习笔记:ADC

1 ADC简介 ADC的作用ADC就是一个电压表&#xff0c;把引脚的电压值测出来&#xff0c;放在一个变量里 DAC的作用信号发生器、音频解码芯片 ADC的两个关键参数&#xff1a; 1、分辨率&#xff0c;一般用多少位来表示&#xff0c;12位AD值&#xff0c;它的表示范围就是0~2^12-1&…

Modern C++ std::any为何要求Tp可拷贝构造?

小问题也会影响设计的思路&#xff0c;某个问题或某种case的探讨有助于理解设计的初衷。 声明&#xff1a;以下_Tp/Tp都是指要放入std::any的对象的类型。 它要求_Tp is_copy_constructible, 仅仅是因为有很多函数的实现调用了Tp的拷贝构造函数吗&#xff1f;比如说上节提到的初…