实战:Spring Cloud Stream消息驱动框架整合rabbitMq

文章目录

    • 前言
    • Spring Cloud Stream简析
    • Spring Cloud Stream与rabbitmq整合
      • 1、添加pom依赖
      • 2、application.yml增加mq配置
      • 3、定义输入输出信道
      • 4、使用输入输出信道收发消息
      • 5、模拟正常消息消费
      • 6、模拟异常消息

前言

相信很多同学都开发过WEB服务,在WEB服务的开发中一般是通过缓存、队列、读写分离、削峰填谷、限流降级等手段来提高服务性能和保证服务的正常投用。对于削峰填谷就不得不用到我们的MQ消息中间件,比如适用于大数据的kafka,性能较高支持事务活跃度高的rabbitmq等等,MQ的选用和整合已经是JAVA WEB开发中不可或缺对的一部分。当然,作为号称JAVA微服务框架全家桶的Spring Cloud也提供了良好的适配中间件的功能。今天我们就来整合一下微服务全家桶Spring Cloud提供的消息驱动——Spring Cloud Stream。

Spring Cloud Stream简析

Spring Cloud Stream是用于构建微服务具有消息驱动能力的框架,应用程序通过inputs、outputs通道与binder进行交互,binder与消息中间件进行通信。

binder的作用是将消息中间件进行粘合,相当于对第三方中间件进行封装整合,让开发人员不用关心底层消息中间件如何运行。
在这里插入图片描述

inputs是消息输入通道,类似于消息中间件的consumer消费者;outputs是消息输出通道,类似于消息中间件的producer生产者。应用程序收发消息不再直接调用消息中间件的接口或者逻辑代码,直接使用Spring Cloud Stream 的OUTPUT与INPUT通道进行处理。

可以通过binder绑定选用各种消息中间件,用binding进行中间件的相关参数配置,让应用程序达到灵活配置和切换消息中间件的目的。

Spring Cloud Stream与rabbitmq整合

本次整合直接与rabbitmq整合,如果是使用kafka的同学,可以直接移植配置修改对应粘接mq即可。

本次整合加入了消费重试机制、死信队列,并提供死信队列消费监听方法,可直接移植到生产环境。

1、添加pom依赖

引入spring-cloud-starter-stream-rabbit 需要从Spring Cloud中引入,注意dependencyManagement的配置。

<properties><java.version>1.8</java.version><spring-cloud.version>Hoxton.SR10</spring-cloud.version>
</properties><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
</dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

2、application.yml增加mq配置

spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: adminvirtual-host: /cloud:stream:binders: #stream框架粘接的mqmyRabbit: #自定义个人mq名称type: rabbitenvironment:spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: adminvirtual-host: /bindings: #stream绑定信道output_channel: #自定义发送信道名称destination: assExchange #目的地 交换机/主题content-type: application/jsonbinder: myRabbit #粘接到的mqgroup: assGroupinput_channel: #自定义接收信道destination: assExchange #目的地 交换机/主题content-type: application/jsonbinder: myRabbit #粘接到的mqgroup: assGroupconsumer:maxAttempts: 3 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10srabbit: #stream mq配置bindings:input_channel:consumer:concurrency: 1 #消费者数量max-concurrency: 5 #最大消费者数量durable-subscription: true  #持久化队列recovery-interval: 3000  #3s 重连acknowledge-mode: MANUAL  #手动requeue-rejected: false #是否重新放入队列auto-bind-dlq: true #开启死信队列requeueRejected: true #异常放入死信

3、定义输入输出信道

/*** MqChannel* @author senfel* @version 1.0* @date 2023/6/2 15:46*/
public interface MqChannel {/*** 消息目的地 RabbitMQ中为交换机名称*/String destination = "assExchange";/*** 输出信道*/String OUTPUT_CHANNEL = "output_channel";/*** 输入信道*/String INPUT_CHANNEL = "input_channel";/*** 死信队列*/String INPUT_CHANNEL_DLQ = "assExchange.assGroup.dlq";@Output(MqChannel.OUTPUT_CHANNEL)MessageChannel output();@Input(MqChannel.INPUT_CHANNEL)SubscribableChannel input();}

4、使用输入输出信道收发消息

TestMQService

/*** TestMQService* @author senfel* @version 1.0* @date 2023/6/2 15:47*/
public interface TestMQService {/*** 发送消息*/void send(String str);
}

TestMQServiceImpl

/*** TestMQServiceImpl* @author senfel* @version 1.0* @date 2023/6/2 15:49*/
@Service
@Slf4j
@EnableBinding(MqChannel.class)
public class TestMQServiceImpl implements TestMQService {@Resourceprivate MqChannel mqChannel;@Overridepublic void send(String str) {mqChannel.output().send(MessageBuilder.withPayload("测试=========="+str).build());}/*** 接收消息监听* @param message 消息体* @param channel 信道* @param tag 标签* @param death* @author senfel* @date 2023/6/5 9:25* @return void*/@StreamListener(MqChannel.INPUT_CHANNEL)public void process(String message,@Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("message : "+message);if(message.contains("9")){// 参数1为消息的tag  参数2为是否多条处理 参数3为是否重发//channel.basicNack(tag,false,false);System.err.println("--------------消费者消费异常--------------------------------------");System.err.println(message);throw new RuntimeException("抛出异常");}else{System.err.println("--------------消费者--------------------------------------");System.err.println(message);channel.basicAck(tag,false);}}/*** 死信监听* @param message 消息体* @param channel 信道* @param tag 标签* @param death* @author senfel* @date 2023/6/5 14:30* @return void*/@RabbitListener(bindings = @QueueBinding(value = @Queue(MqChannel.INPUT_CHANNEL_DLQ), exchange = @Exchange(MqChannel.destination)),concurrency = "1-5")public void processByDlq(String message,@Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("message : "+message);System.err.println("---------------死信消费者------------------------------------");System.err.println(message);}
}

controller

/*** @author senfel* @version 1.0* @date 2023/6/2 17:27*/
@RestController
public class TestController{@Resourceprivate TestMQService testMQService;@GetMapping("/test")public String testMq(String str){testMQService.send(str);return str;}
}

5、模拟正常消息消费

在这里插入图片描述在这里插入图片描述

6、模拟异常消息

异常消息重试满足3次投递后进入死信消费
在这里插入图片描述
在这里插入图片描述
加粗样式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/351420.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

分享几个实用又有趣的工具类网站

今天来给大家分享几个众多网友们推荐的&#xff0c;实用又有趣的工具类网站 DeepL翻译&#xff1a;免费的在线翻译网站&#xff0c;大名鼎鼎的翻译神器 DeepL&#xff0c;翻译准确、语句通顺&#xff0c;效果相当好。 https://www.deepl.com/translator BgSub 消除或者替换图…

JavaEE(系列20) -- 网络编程之UDP和TCP套接字

目录 1. 网络编程 2. UDP网络编程 2.1 DatagramSocket API 2.2 DatagramPacket API 2.3 基于UDP实现的回显服务器 2.3.1 UDP服务器设计步骤 2.3.2 服务器代码 2.3.3 客户端代码 2.3.4 基于UDP写一个字典服务器 3. TCP网络编程 3.1 ServerSocketAPI 3.2 SocketAPI …

window专业版激活

1、管理员权限进入命令行 2、安装密钥 slmgr /ipk W269N-WFGWX-YVC9B-4J6C9-T83GX 3、设置kms服务器 slmgr /skms zh.us.to 4、查看是否激活 slmgr /ato 转载于:https://www.cnblogs.com/Edward-Yue/p/10942884.html

电脑显示未激活Windows的解决办法

今天遇到一个问题&#xff0c;桌面显示要我激活windows&#xff0c;如下图&#xff1a; 那么接下来就分享一下我的解决办法&#xff1a; 一、鼠标右击桌面左下角windows 键&#xff0c;选择Windows PowerShell&#xff08;管理员&#xff09;&#xff0c;会出现如下页面&#x…

Windows server 2022安装与激活

1.下载 百度网盘下载 链接&#xff1a;https://pan.baidu.com/s/18c5smZPzbk0ClhEYh4LQ2w 提取码&#xff1a;w7i8 2.安装 最近版本新出的镜像官方的 EFI 文件在虚拟机上部署有问题&#xff0c;如发现不能在虚拟机内使用&#xff0c;切换虚拟机配置为 BIOS&#xff0c;而非…

查看已激活Windows的密钥

今天工作之余&#xff0c;过个快乐的周末心情很是不错&#xff0c;然后打开电脑&#xff0c;闲暇之余&#xff0c;突然想到了window密钥的问题&#xff0c;然后就写个文章记录一下Windows如何在命令行查看密钥&#xff01; 方法一&#xff1a;DOS命令查看&#xff1a;如果你是…

Linux内核日志打印时间开关

echo Y > /sys/module/printk/parameters/time //打开内核日志打印时间 echo N > /sys/module/printk/parameters/time //关闭内核日志打印时间 cat /sys/module/printk/parameters/time //查看内核日志打印时间开关状态 static bool printk_time IS_ENABLED(CONFIG…

在 Windows 10 中如何查看系统的激活状态?

查看激活方法 方法一方法二方法三 升级到 Windows 10 或重新安装 Windows 10 后&#xff0c;可能需要检查 Windows 10 安装是否已激活&#xff0c;那么可以通过以下方法来实现&#xff1a; 方法一 开始按钮->设置&#xff08;快捷键 Windows I&#xff09; 点击“更新和…

win 10 系统激活

win10企业版永久激活方法?win10企业版是针对企业用户推出的版本&#xff0c;随着win10系统的不断完善&#xff0c;现在越来越多的人选择升级win10&#xff0c;升级完系统就需要激活它。那么今天就为大家分享一下怎么永久激活win10企业版。 1、右键点击桌面左下角"windows…

激活windows

激活windows 本来一直觉得office不激活也不影响使用&#xff0c;直到最近写报告才发现影响真是很大&#xff0c;搜了好多激活码基本都不可用。直到在某个网站上看到的一个非常简单的一个方式&#xff0c;话不多说了&#xff0c;直接开始。 第一步&#xff1a;下载安装激活工具…

细说Windows系统主流激活的原理与弊端!

Windows系统有必要激活吗&#xff1f; 答案是“有必要”。虽然微软已经放开了系统&#xff0c;不再抓着版权不放&#xff0c;而且可以通过“自建验证服务器”的方式可以自行激活盗版系统&#xff0c;但相较于已经激活的正版系统而言&#xff0c;正版系统提供的功能更齐全&#…

赛效:广告文案自动生成器有哪些

1&#xff1a;在电脑浏览器上打开即时工具&#xff0c;登录自己的账号后&#xff0c;在“智能文本”菜单里点击“广告文案”。 2&#xff1a;在输入框里输入产品类型、产品特性&#xff0c;然后点击下方“生成”。 3&#xff1a;生成广告文案后&#xff0c;点击文案右上角的复制…

JavaSE进阶(day12,复习自用)

网络编程&#xff08;通信&#xff09; 网络通信三要素三要素概述、要素一&#xff1a;IP地址IP地址操作类-InetAddress要素二&#xff1a;端口号要素三&#xff1a;协议 UDP通信-快速入门UDP通信-广播、组播TCP通信-快速入门编写客户端代码编写服务端代码、原理分析 TCP通信-多…

运维小白必学篇之基础篇第十四集:DHCP中继实验

DHCP中继实验 目录 服务器端&#xff1a;&#xff08;vmware5&#xff09; 中继器端&#xff1a;(双网卡ens33、vmware5&#xff1b;ens36、vmware6&#xff09; 客户端&#xff1a;&#xff08;vmware6&#xff09; 实验作业&#xff08;主机名为自己的名字&#xff09;&a…

Bert+FGSM中文文本分类

我上一篇博客已经分别用BertFGSM和BertPGD实现了中文文本分类&#xff0c;这篇文章与我上一篇文章BertFGSM/PGD实现中文文本分类&#xff08;Loss0.5L10.5L2)_Dr.sky_的博客-CSDN博客的不同之处在于主要在对抗训练函数和embedding添加扰动部分、模型定义部分、Loss函数传到部分…

ROS:tf坐标系广播与监听的编程实现

目录 一、创建功能包二、创建代码并编译运行&#xff08;C&#xff09;2.1创建代码2.2编译2.3运行 一、创建功能包 创建的 learning_tf 包来进行代码存放和编译 cd ~/catkin_ws/src catkin_create_pkg learning_tf roscpp rospy tf turtlesim二、创建代码并编译运行&#xff…

视频裁剪

从一段较长的视频中裁剪某一段(时间自己定)&#xff0c;如图 结合api com.github.yangjie10930:EpMedia:v1.0.1 视频裁剪框架 可以实现可视化视频裁剪。 说不多说上源码&#xff1a;https://gitee.com/kbld/video-edit 视频压缩的可以看看https://blog.csdn.net/qq_35198779…

Android 视频裁剪自定义 View

PlaySeekbar - Github Android 视频裁剪自定义 View - 裁减播放的视频&#xff08;本地视频&#xff09; 功能需求与预览 有个视频裁剪功能&#xff0c;需要自定义 View 具体如下 裁剪选择区域模块&#xff0c;可以自定义最少裁剪时间 当选择低于最少裁剪时间时&#xff0c;…

海外网红营销必备:品牌与海外红人合作的谈判技巧指南

随着社交媒体的飞速发展&#xff0c;海外网红已经成为品牌营销的热门选择。与知名红人合作&#xff0c;可以有效地扩大品牌影响力、提升产品认知度&#xff0c;并吸引目标受众。然而&#xff0c;与红人的谈判过程常常充满挑战&#xff0c;需要品牌营销人员具备一定的技巧和策略…

linux裁剪视频教程,适用于Linux桌面的超简单实用的视频裁剪应用

原标题&#xff1a;适用于Linux桌面的超简单实用的视频裁剪应用 来自&#xff1a;Linux迷 https://www.linuxmi.com/video-trimmer-linux.html 您可能已经知道一些适用于Linux 的最佳免费视频编辑器&#xff0c;但并不是每个人都需要那些提供的所有功能。 有时&#xff0c;您只…