第十一章 Stream消息驱动

Stream消息驱动

gitee:springcloud_study: springcloud:服务集群、注册中心、配置中心(热更新)、服务网关(校验、路由、负载均衡)、分布式缓存、分布式搜索、消息队列(异步通信)、数据库集群、分布式日志、系统监控链路追踪。

1. 消息驱动概述

作用:屏蔽底层消息中间件的差异,降低切换成本,统—消息的编程模型。底层不管是什么中间件如kafka、rabbitmq,Stream可以解决不同中间件的通信。 官网:Spring Cloud Stream

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
​
应用程序通过 inputs 或者 outputsj来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
​
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
​
但是Stream只支持kafka、rabbitmq。

img

设计思想 标准的MQ:

1.生产者/消费者之间靠消息媒介传递信息内容:Message
2.消息必须走特定的通道:消息通道MessageChannel
3.消息通道里的消息如何被消费呢,谁负责收发处理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

Cloud Stream:

Stream利用Binder来绑定中间件的输入流和输出流。如果系统使用到了两个中间件(kafka、rabbitmq):这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的人—大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

Stream中的消息通信方式遵循了发布-订阅模式:

Topic在Rabbitmq中是Exchange、在kafka中是Topic。

Spring Cloud Stream标准流程套路

img

Middleware:中间件,目前只支持RabbitMQ和Kafka
Binder:是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener:监听队列。用于消费者的队列的消息接收
@EnableBinding:指信道channel和exchange绑定在一起

Binder:很方便的连接中间件,屏蔽差异 Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。 Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2. 消息驱动之生产者

创建cloud-stream-rabbitmq-provider8801:作为生产者进行发消息模块

  1. pom文件

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.yaml

server:port: 8801
spring:application:name: cloud-stream-providercloud:stream:binders: #在此处配置要绑定的rabbitmq的服务信息;defaultRabbit:type: rabbitenvironment:spring:rabbitmq:host: 192.168.25.153port: 5672username: adminpassword: aaaaaabindings: #服务的整合处理output: #destination: studyExchange  #表示要使用的Exchange名称定义content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit #设置要绑定的消.息服务的具体设置
eureka:client:service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eurekaregister-with-eureka: truefetch-registry: trueinstance:lease-renewal-interval-in-seconds: 2lease-expiration-duration-in-seconds: 5instance-id: send-8801.comprefer-ip-address: true
  1. 主启动类

@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8801.class,args);}
}
  1. service层

public interface IMessageProvider {String send();
}
@EnableBinding(Source.class)  //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
​@Resourceprivate MessageChannel output;  //消息发送管道
​@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("********serial:"+serial);return null;}
}
  1. controller层

@RestController
public class SendMessageController {
​@Resourceprivate IMessageProvider messageProvider;
​@GetMapping(value = "/sendMessage")public String sendMessage(){return messageProvider.send();}
}

测试:

3. 消息驱动之消费者

创建cloud-stream-rabbitmq-consumer8802,作为消息接收模块

  1. pom文件

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.yml

server:port: 8802
spring:application:name: cloud-stream-consumercloud:stream:binders: #在此处配置要绑定的rabbitmq的服务信息;defaultRabbit:type: rabbitenvironment:spring:rabbitmq:host: 192.168.25.153port: 5672username: adminpassword: aaaaaabindings: #服务的整合处理input: #destination: studyExchange  #表示要使用的Exchange名称定义content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit #设置要绑定的消.息服务的具体设置
eureka:client:service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eurekaregister-with-eureka: truefetch-registry: trueinstance:lease-renewal-interval-in-seconds: 2lease-expiration-duration-in-seconds: 5instance-id: receive-8802.comprefer-ip-address: true
  1. controller层

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {
​@Value("${server.port}")private String serverPort;
​@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1号,------>接收到的消息:"+message.getPayload()+"\t port:"+serverPort);}
}
  1. 主启动类

@SpringBootApplication
@EnableEurekaClient
public class ConsumerMQMain8802 {public static void main(String[] args) {SpringApplication.run(ConsumerMQMain8802.class,args);}
}

测试:

启动loccalhost:8801/sendMessage就可以了,消费者就是一个监听器,有message就消费。

4. 分组消费与持久化

根据cloud-stream-rabbitmq-consumer8802创建8803项目,运行暴露问题:


消息重复消费和消息持久化问题,需要进行分组操作。注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

解决重复消费方法:加入同一个组(下图是不同分组的情况)

cloud-stream-rabbitmq-consumer8802和8803设置不同分组yicaiA/B

server:port: 8803
spring:application:name: cloud-stream-consumercloud:stream:binders: #在此处配置要绑定的rabbitmq的服务信息;defaultRabbit:type: rabbitenvironment:spring:rabbitmq:host: 192.168.25.153port: 5672username: adminpassword: aaaaaabindings: #服务的整合处理input: #destination: studyExchange  #表示要使用的Exchange名称定义content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit #设置要绑定的消.息服务的具体设置group: yicaiB
server:port: 8802
spring:application:name: cloud-stream-consumercloud:stream:binders: #在此处配置要绑定的rabbitmq的服务信息;defaultRabbit:type: rabbitenvironment:spring:rabbitmq:host: 192.168.25.153port: 5672username: adminpassword: aaaaaabindings: #服务的整合处理input: #destination: studyExchange  #表示要使用的Exchange名称定义content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit #设置要绑定的消.息服务的具体设置group: yicaiA

cloud-stream-rabbitmq-consumer8802和8803设置同一个组yicaiA

server:port: 8802
spring:application:name: cloud-stream-consumercloud:stream:binders: #在此处配置要绑定的rabbitmq的服务信息;defaultRabbit:type: rabbitenvironment:spring:rabbitmq:host: 192.168.25.153port: 5672username: adminpassword: aaaaaabindings: #服务的整合处理input: #destination: studyExchange  #表示要使用的Exchange名称定义content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit #设置要绑定的消.息服务的具体设置group: yicaiA

测试:

持久化 加上group就算实现类持久化。所谓的持久化就是如果没有分组,一个服务发送消息,其他服务由于没有分组,如果其他哪些服务断开,又继续重启,这样就会导致以前那些消息丢失。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2660725.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Apache Commons JCS缓存解决方案

第1章&#xff1a;引言 大家好&#xff0c;我是小黑&#xff01;今天&#xff0c;咱们来聊聊Apache Commons JCS&#xff0c;一个Java界里的缓存大杀器。缓存技术&#xff0c;对于提高应用性能来说&#xff0c;就像是给它加了一剂兴奋剂&#xff0c;能让数据访问变得快如闪电。…

Qt(二):使用udp发送与接收图片

使用Qt来通过UDP协议发送和接收图片可以分为几个步骤。以下是一个基本的指南&#xff1a; 发送图片准备图片数据&#xff1a;首先&#xff0c;你需要将图片转换为可以在网络上传输的数据格式。通常&#xff0c;这涉及到将图片转换为字节数组。设置UDP套接字&#xff1a;在Qt中…

OpenCV-Python(21):OPenCV查找及绘制轮廓

1.认识轮廓 1.1 目标 理解什么是轮廓学习掌握找轮廓、绘制轮廓等学习使用cv2.findContours()、cv2.drawContours()函数的用法 1.2 什么是轮廓 在OpenCV中&#xff0c;轮廓是图像中连续的边界线的曲线&#xff0c;具有相同的颜色或者灰度&#xff0c;用于表示物体的形状。轮廓…

linux用户态与内核态通过字符设备交互

linux用户态与内核态通过字符设备交互 简述 Linux设备分为三类&#xff0c;字符设备、块设备、网络接口设备。字符设备只能一个字节一个字节读取&#xff0c;常见外设基本都是字符设备。块设备一般用于存储设备&#xff0c;一块一块的读取。网络设备&#xff0c;Linux将对网络…

web自动化(4)——POM设计重构

1. 什么是POM Page Object Model 是ui自动化测试中常见的封装方式。 原理&#xff1a;将页面封装为PO对象&#xff0c;然后通过面向对象的方式实现UI自动化 2. 封装原则 PO无需包含全部UI元素PO应当验证元素PO不应该包含断言PO不应该暴露元素 3. 怎么进行POM封装 面向对象…

Centos7:Jenkins+gitlab+node项目启动(2)

Centos7&#xff1a;Jenkinsgitlabnode项目启动(1) Centos7&#xff1a;Jenkinsgitlabnode项目启动(1)-CSDN博客 Centos7&#xff1a;Jenkinsgitlabnode项目启动(2) Centos7&#xff1a;Jenkinsgitlabnode项目启动(2)-CSDN博客 Centos7&#xff1a;Jenkinsgitlabnode项目启…

自动化网络故障修复管理

什么是故障管理 故障管理是网络管理的组成部分&#xff0c;涉及检测、隔离和解决问题。如果实施得当&#xff0c;网络故障管理可以使连接、应用程序和服务保持在最佳水平&#xff0c;提供容错能力并最大限度地减少停机时间。专门为此目的设计的平台或工具称为故障管理系统。 …

目标检测损失函数:IoU、GIoU、DIoU、CIoU、EIoU、alpha IoU、SIoU、WIoU原理及Pytorch实现

前言 损失函数是用来评价模型的预测值和真实值一致程度&#xff0c;损失函数越小&#xff0c;通常模型的性能越好。不同的模型用的损失函数一般也不一样。损失函数主要是用在模型的训练阶段&#xff0c;如果我们想让预测值无限接近于真实值&#xff0c;就需要将损失值降到最低…

vue3(十)-基础入门之Swiper轮播与自定义指令

一、Swiper html : 注意&#xff1a; class“swiper-wrapper”、class“swiper-slide” 等类名不能写错 <body><!-- 导入下载好的包或通过 CDN 导入vue、swiper.js、swiper.css --><!-- <script src"https://unpkg.com/vue3/dist/vue.global.js"&…

RK3568平台开发系列讲解(Linux系统篇)PWM系统编程

🚀返回专栏总目录 文章目录 一、什么是PWM二、PWM相关节点三、PWM应用编程沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将介绍 PWM 的系统编程。 一、什么是PWM PWM,即脉冲宽度调制(Pulse Width Modulation)

【PowerMockito:编写单元测试过程中采用when打桩失效的问题】

问题描述 正如上图所示&#xff0c;采用when打桩了&#xff0c;但是&#xff0c;实际执行的时候还是返回null。 解决方案 打桩时直接用any() 但是这样可能出现一个mybatisplus的异常&#xff0c;所以在测试类中需要加入以下代码片段&#xff1a; Beforepublic void setUp() …

Awesome Chrome Form UI - 框架设计与基础实现

Money is not evil by itself. Its just paper with perceived value to obtain other things we value in other ways. If not money what is evil you may ask? Evil is the unquenchable, obsessive and moral bending desire for more. Evil is the bottomless,soulless …

解决VSCode中C/C++ Project Generator插件创建的项目只能运行单个程序的问题

初六&#xff0c;履霜&#xff0c;坚冰至。 释意&#xff1a;初六&#xff0c;当你踩着微霜之时&#xff0c;严寒与坚冰也就即将到来。 目录 一、前言 二、问题描述 三、解决方案 1、思路总结 2、思考过程 3、解决方案&#xff08;直接用&#xff0c;报错找我(&#xff8…

超声功率放大器怎么用

超声功率放大器是一种用于放大超声信号的设备&#xff0c;广泛应用于医疗领域、工业领域和科学研究中。它能够将超声信号的能量增加到足够大的水平&#xff0c;以便进行高强度超声疗法、材料加工和实验研究等应用。下面将详细介绍超声功率放大器的使用方法和其工作原理。 首先&…

数据结构——红黑树 and B-树

红黑树 根据平衡条件第4、5两点 最短路径&#xff0c;都是黑色 最长路径&#xff0c;红黑相间 最长是最短的两倍 B-树

《深入理解Java虚拟机(第三版)》读书笔记:Java内存区域与内存溢出异常、垃圾收集器与内存分配策略

下文是阅读《深入理解Java虚拟机&#xff08;第3版&#xff09;》这本书的读书笔记&#xff0c;如有侵权&#xff0c;请联系删除。 文章目录 第2章 Java内存区域与内存溢出异常2.2 运行时数据区域2.3 HotSpot虚拟机对象探秘 第3章 垃圾收集器与内存分配策略3.2 对象已死&…

安装Node修改Node镜像地址搭建Vue脚手架创建Vue项目

1、安装VSCode和Node 下载VSCode Visual Studio Code - Code Editing. Redefined 下载Node Node.js (nodejs.org) 检验是否安装成功&#xff0c;WinR,输入cmd命令&#xff0c;使用node -v可以查看到其版本号 2、修改镜像地址 安装好node之后&#xff0c;开始修改镜像地址 …

【 YOLOv5】目标检测 YOLOv5 开源代码项目调试与讲解实战(4)-自制数据集及训练(使用makesense标注数据集)

如何制作和训练自己的数据集 看yolov5官网创建数据集1.搜索需要的图片2.创建标签标注数据集地址&#xff1a;放入图片后选择目标检测创建文档&#xff0c;每个标签写在单独的一行上传结果此处可以编辑类别把车框选选择类别即可导出数据 3.新建一个目录放数据写yaml文件 4. 测试…

重装系统以后无法git跟踪

总结&#xff1a;权限问题 故障定位 解决方案&#xff1a; 复制一份新的文件夹。&#xff08;新建的文件创建和写入权限都变了&#xff09; 修改文件为新的用户 执行提示的命令

docker +gitee+ jenkins +maven项目 (一)

jenkins环境和插件配置 文章目录 jenkins环境和插件配置前言一、环境版本二、jenkins插件三、环境安装总结 前言 现在基本都是走自动化运维&#xff0c;想到用docker 来部署jenkins &#xff0c;然后jenkins来部署java代码&#xff0c;做到了开箱即用&#xff0c;自动发布代码…