SpringCloud(17)之SpringCloud Stream

一、Spring Cloud Stream介绍

        Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的Spring习惯用法和最佳实践之上,包括对持久发布/子语义、使用者组和有状态分区的支持。  

        它可以基于 Spring Boot来创建独立的、可用于生产的  Spring应用程序,Spring Cloud Stream为一些供应商的消息   中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通   过使用 Spring Cloud Stream ,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。目前 Spring Cloud Stream 支持 RabbitMQ  Kafka 自动化配置。

        目前Spring Cloud Stream只适配以下中间件信息:

二、Spring Cloud Stream 工作流程

        Spring Cloud Stream应用程序由一个与中间件无关的核心组成。应用程序通过在外部代理公开的目的地和代码中的输入/输出参数之间建立绑定来与外部世界通信。建立绑定所需的特定于Broker的详细信息由特定于中间件的Binder实现来处理。

        通过Stream可以很好的屏蔽各个中间件的API差异,它统一了API,生产者通过OUTPUT向消息中间件发 送消息,此时并不需要关心消息中间件是Kafka还是RabbitMQ,不需要关注他们的API,只需要用到Stream的API,这样可以降低学习成本。消费方通过INPUT消费指定的消息,也不需要关注消息中间件 API,架构图如上图: 

        我们对上图的对象进行说明:

  • Application Core:生产者、消费者;
  • inputs:消费者;
  • ouputs:生产者;
  • Binder:绑定器,主要和消息中间件进行绑定操作;
  • Middleware:消息中间件服务;

        

        我们项目中真正应用到Stream,只需要按照如上流程图操作即可;

 生产者:

        1:使用Source绑定消息的输出管道。

        2:通过MessageChannel输出消息。

        3:通过@EnableBinding开启binder,将生产者绑定到指定的MQ服务。

消费者:      

        1:通过@EnableBinding绑定到MQ。

        2:通过Sink绑定到输入数据管道。

        3:@StreamListener监听指定管道数据。

 2.1 Spring Cloud Stream 实战

        

        如上图,当用户行程结束,用户需进入支付操作,当用户支付完成时,我们需要更新订单状态,此时我 们可以让支付系统将支付状态发送到MQ中,订单系统订阅MQ消息,根据MQ消息修改订单状态。我们 将使用 SpringCloud Stream实现该功能。

2.1.1 生产者

1)引入依赖 

   hailtaxi-pay 中引入依赖:

        <!--stream--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

2) 配置MQ服务

 修改 hailtaxi-pay  application.yml 添加如下配置:

server:port: 18083
spring:application:name: hailtaxi-paycloud:#Consul配置consul:host: localhostport: 8500discovery:#注册到Consul中的服务名字service-name: ${spring.application.name}#Streamstream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: 192.168.211.145port: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: payExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

3)消息输出管道绑定 

/**** 负责向MQ发送消息*/
@EnableBinding(Source.class)
public class MessageSender {@Resourceprivate MessageChannel output;//消息发送管道/**** 发送消息* @param message* @return*/public Boolean send(Object message) {//消息发送boolean bo = output.send(MessageBuilder.withPayload(message).build());System.out.println("*******send message: "+message);return bo;}
}

 参数说明:

Source.class:绑定一个输出消息管道Channel。

MessageChannel:发送消息对象,默认是DirectWithAttributesChannel,发消息在 AbstractMessageChannel中完成。

MessageBuilder.withPayload:构建消息。

        此时大家可能会有一个疑问?如果我们多个channel,在rabbitMQ中就是说我一个服务有多个交换机该怎么办?

        我们来看下 Source.class里面定义的内容是什么,定义的内容如下:

public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}

        所以说如果此时我们要新的管道的话,我们就可以参考Source来定义新的类,然后OUTPUT就定义新的管道名称,然后再配置文件中我们就定义这个新的管道名称。 

4)消息发送 

  com.itheima.pay.controller.TaxiPayController 中创建支付方法用于发送消息,代码如下:

    /**** 支付  http://localhost:18083/pay/wxpay/1* @return*/@GetMapping(value = "/wxpay/{id}")public TaxiPay pay(@PathVariable(value = "id")String id){//支付操作TaxiPay taxiPay = new TaxiPay(id,310,3);//发送消息messageSender.send(taxiPay);return taxiPay;}

2.1.2 消费者 

1)修改配置 

 修改 hailtaxi-order 的核心配置文件 application.yml ,在文件中配置要监听MQ信息:

server:port: 18082
spring:application:name: hailtaxi-orderzipkin:#zipkin服务地址base-url: http://localhost:9411sleuth:sampler:probability: 1  #采样值,0~1之间,1表示全部信息都手机,值越大,效率越低cloud:#Consul配置consul:host: localhostport: 8500discovery:#注册到Consul中的服务名字service-name: ${spring.application.name}#Streamstream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: 192.168.211.145port: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: payExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit  # 设置要绑定的消息服务的具体设置group: paygroup #所属分组

2)消息监听 

  hailtaxi-order 中创建消息监听对象 com.itheima.order.mq.MessageReceiver ,代码如下:

@EnableBinding(Sink.class)
public class MessageReceiver {@Value("${server.port}")private String port;/***** 消息监听* @param message*/@StreamListener(Sink.INPUT)public void receive(String message) {System.out.println("消息监听(增加用户积分、修改订单状态)-->" + message+"-->port:"+port);}
}

参数说明:

Sink.class:绑定消费者管道。

@StreamListener(Sink.INPUT):监听消息配置,指定了消息为application中的input


1.3 消息分组

         消息分组有2个好处,分别是集群合理消费、数据持久化。

 1.3.1集群消费下的分组

1)分组的意义

        分组在项目中是有非常重大的意义,通常应用于消息并发高、消息堆积的场景,这些场景服务消费方通 常会做集群操作,一旦做集群操作,我们又需要项目中的消费者合理消费,比如用户打车支付完成后, 我们需要增加用户积分同时修改订单状态,如果集群环境中有2台服务器都执行该消费操作,此时用户  积分会增加两次,就会造成非幂等问题。 

 

        此时集群中相同服务应该属于同一个组,同一个组中只允许有一个足节点消费某一个信息,这样就可以 避免费幂等问题的出现。

2)分组实战 

        新增一个 hailtaxi-order消费者节点:

  

        此时运行起来,  18082  18182 节点会同时消费所有数据。 

        修改 hailtaxi-order 的核心配置文件 application.yml ,添加分组: 

 

        此时再次测试,可以发现消费者不会重复消费数据。 

1.3.2 数据持久化

        我们把分组去掉,停掉 hailtaxi-order 服务,然后请求 http://localhost:18083/pay/wxpay/1 送数据,发送完数据后,再启动 hailtaxi-order服务,此时发现没有数据可以消费,这是因为数据没 有持久化,是一种广播模式,如果需要数据持久化,得给每个消费节点添加group组即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2813725.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Coze开源软件Windows客户端-coze_desk

字节的coze相信大家都已经有所关注了&#xff0c;最近看到很多公众号在推。笔者也在使用&#xff0c;体验很不错。 这个是官网&#xff1a;https://www.coze.com/。 官网版 应用的样子 三栏式布局&#xff0c;用起来还是可以的。 不过这个是在浏览器端&#xff0c;有时候不小…

springcloud -远程调用方式

如果我们要进行远程微服之间的调用该如何完成呢&#xff0c;本文以案例推动以养老系统中老人支付购买商品为例&#xff0c;一步步实现远程微服务的调用。 目标 微服务之间的调用方式 老人支付 common模块设计 统一返回 package com.wnhz.ssc.common.result;import lombok.G…

书生·浦语大模型全链路开源体系介绍

背景介绍 随着人工智能技术的迅猛发展&#xff0c;大模型技术已成为当今人工智能领域的热门话题。2022 年 11 月 30 日&#xff0c;美国 OpenAI 公司发布了 ChatGPT 通用型对话系统 并引发了全球 的极大关注&#xff0c;上线仅 60 天月活用户数便超过 1 亿&#xff0c;成为历史…

开源人脸检测模型MTCNN简单的例子

阅读本文之前可以先参阅----神经网络中的重要概念 如何快速入门深度学习 当使用MTCNN模型进行人脸检测时&#xff0c;你可以使用Python编程语言和相应的深度学习库来实现。下面是一个简单的例子&#xff0c;演示了如何使用MTCNN模型进行人脸检测&#xff1a; 首先&#xff0c;…

每日学习总结20240227

每日总结 20240227 1.如何将字符串通过串口以十六进制进行传输 将文件名或者文件内容通过串口传输&#xff0c;再解析&#xff0c;拼接成源文件 1.1 文件转换 1.1.1 转十六进制 在Linux中&#xff0c;你可以使用 xxd 命令将文本文件转换为十六进制格式。以下是如何在Linux中…

天翼云登录参数JavaSrcipt逆向

天翼云登录参数 password 、comParam_curTime、comParam_seqCode、comParam_signature JavaSrcipt逆向 目标网站 https://m.ctyun.cn/wap/main/auth/login?redirect/my 目标参数 要逆向的有 password、comParam_curTime、comParam_seqCode、comParam_signature 四个参数 …

【蓝桥杯嵌入式】蓝桥杯嵌入式第十四届省赛程序真题,真题分析与代码讲解

&#x1f38a;【蓝桥杯嵌入式】专题正在持续更新中&#xff0c;原理图解析✨&#xff0c;各模块分析✨以及历年真题讲解✨都已更新完毕&#xff0c;欢迎大家前往订阅本专题&#x1f38f; &#x1f38f;【蓝桥杯嵌入式】蓝桥杯第十届省赛真题 &#x1f38f;【蓝桥杯嵌入式】蓝桥…

软件测试笔记(三):黑盒测试

1 黑盒测试概述 黑盒测试也叫功能测试&#xff0c;通过测试来检测每个功能是否都能正常使用。在测试中&#xff0c;把程序看作是一个不能打开的黑盒子&#xff0c;在完全不考虑程序内部结构和内部特性的情况下&#xff0c;对程序接口进行测试&#xff0c;只检查程序功能是否按…

Web前端3D JS框架和库 整理

在WebGL库和SVG/Canvas元素的支持下&#xff0c;JavaScript变得惊人的强大。几乎可以为网络构建任何东西&#xff0c;包括基于浏览器的游戏和本地应用&#xff0c;许多最新的突破性功能都在3D上运行。 为此&#xff0c;「数维图小编」整理了19个交互式3D Javascript库和框架&am…

开心的金明

好久没发文章了&#xff0c;随着这一题开始2024年吧&#xff01; 题目描述 金明今天很开心&#xff0c;家里购置的新房就要领钥匙了&#xff0c;新房里有一间他自己专用的很宽敞的房间。更让他高兴的是&#xff0c;妈妈昨天对他说&#xff1a;“你的房间需要购买哪些物品&…

每日一练:LeeCode-235、二叉搜索树的最近公共祖先【二叉搜索树+DFS+从上往下】

本文是力扣每日一练&#xff1a;LeeCode-235、二叉搜索树的最近公共祖先【二叉搜索树DFS从上往下】 学习与理解过程&#xff0c;本文仅做学习之用&#xff0c;对本题感兴趣的小伙伴可以出门左拐LeeCode。 给定一个二叉搜索树, 找到该树中两个指定节点的最近公共祖先。 百度百…

idea2023新UI风格不见了怎么办?

用了一段时间idea2023,有一天不知道点了什么&#xff0c;整个UI又变成了2022的风格 如果想换成2023的UI风格怎么办&#xff1f; 点击file->setting->new UI->勾选Enable new UI&#xff0c;restart就可以回到最新版本的UI了 新风格

wu-framework-parent 项目明细

wu-framework-parent 介绍 springboot 版本3.2.1 wu-framework-parent 是一款由Java语言开发的框架&#xff0c;目标不写代码但是却能完成功能。 框架涵盖无赖ORM( wu-database-lazy-starter)、仿生组件 、easy框架系列【Easy-Excel、easy-listener、easy-upsert】 授权框架(…

【C++进阶】STL容器--list底层剖析(迭代器封装)

目录 前言 list的结构与框架 list迭代器 list的插入和删除 insert erase list析构函数和拷贝构造 析构函数 拷贝构造 赋值重载 迭代器拷贝构造、析构函数实现问题 const迭代器 思考 总结 前言 前边我们了解了list的一些使用及其注意事项&#xff0c;今天我们进一步深入…

对于大前端开发来说,转鸿蒙开发究竟是福还是祸?

从铺天盖地的市场消息来看&#xff0c;华为即将面世的鸿蒙NEXT系统已经势不可挡了 想必大家都已经迫不及待地想要进行尝试。 估计大家都有着同样的疑问&#xff1a; 会不会是下一个风口&#xff1f;转鸿蒙应用开发难吗&#xff1f; 会不会是下一个风口&#xff1f; 自从鸿蒙…

C++:类与对象(3)

创作不易&#xff0c;感谢三连 一、深入解析构造函数 如上图&#xff0c;在一般情况下&#xff0c;我们认为A类中的_a1和_a2只不过是声明&#xff0c;并没有开空间&#xff0c;而真正的空间开辟是在【定义】的时候&#xff0c;也就是我们根据这个类实例化出整个对象的时候。 …

高级RAG:从理论到 LlamaIndex 实现,解决原始 RAG 管道的局限性

原文地址&#xff1a;Advanced Retrieval-Augmented Generation: From Theory to LlamaIndex Implementation 如何通过在 Python 中实现有针对性的高级 RAG 技术来解决原始 RAG 管道的局限性 2024 年 2 月 19 日 如何通过在 Python 中实现有针对性的高级 RAG 技术来解决 naiv…

【LeetCode刷题】146. LRU 缓存

请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类&#xff1a; LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中&#xff0c;则返回关键字的值&#xff0c;否则返回 -…

【数据结构】B树,B+树,B*树

文章目录 一、B树1.B树的定义2.B树的插入3.B树的中序遍历 二、B树和B*树1.B树的定义2.B树的插入3.B*树的定义4.B树系列总结 三、B树与B树的应用 一、B树 1.B树的定义 1. 在内存中搜索效率高的数据结构有AVL树&#xff0c;红黑树&#xff0c;哈希表等&#xff0c;但这是在内存…

SpreadJS+vue3练手使用

SpreadJS的练手使用 // 首先在 package.json 这个文件里{"name": "app-admin","private": true,"version": "0.0.0","type": "module","scripts": {"dev": "vite",&quo…