Spring Boot WebFlux:实现web(Server-Sent Events)事件异步推送

WebFlux

Spring Boot中,Flux是一个重要的概念,它是Spring Framework 5.0以后引入的响应式编程框架WebFlux的核心组件之一。FluxReactor项目的一部分,它实现了Reactive Streams规范,用于处理异步、非阻塞的数据流。

与传统的Spring MVC不同,WebFlux不需要Servlet API,而是通过Reactor项目提供的FluxMono来构建响应式应用程序。Flux用于表示包含0到N个元素的异步序列,而Mono则表示包含0到1个元素的异步序列。这使得WebFlux可以在有限的资源下提高系统的吞吐量和伸缩性,从而在处理大量请求时表现出更好的性能。

WebFlux中,所有的操作都是非阻塞的,并且基于Reactor的背压机制来控制数据流的速度,从而防止系统资源的过度消耗。这使得WebFlux在处理高并发请求时具有更好的稳定性和可扩展性。

Spring Boot中使用Flux时,通常需要在控制器方法中返回一个Flux对象,以表示一个异步的数据流。然后,可以使用各种操作符来对这个数据流进行转换、过滤、聚合等操作,以满足业务需求。例如,可以使用map操作符将数据流中的每个元素转换为另一种形式,使用filter操作符过滤掉不符合条件的元素,使用reduce操作符将数据流中的所有元素聚合成一个结果等。

总之,FluxSpring Boot中实现响应式编程的重要组件之一,它使得应用程序能够更好地处理高并发请求,提高系统的吞吐量和伸缩性。

Flux 的主要特性包括:

  • 异步非阻塞:Flux 旨在以非阻塞的方式处理数据,这意味着它不会阻塞调用线程,而是立即返回一个表示异步计算结果的对象。这使得它能够在高并发环境下提供更好的性能和响应能力。

  • 背压(Backpressure):背压是响应式编程中的一个重要概念,它允许接收方控制发送方的数据发送速率,以防止数据过快产生而导致的资源耗尽。Flux 提供了背压机制,使得接收方可以通知发送方其处理能力,从而避免数据丢失或资源耗尽。

  • 冷流与热流:Flux 中的流可以是冷的(Cold)或热的(Hot)。冷流会为每个订阅者重新生成数据,而热流则会广播数据给所有订阅者,无论订阅者何时订阅。

  • 操作符:Flux 提供了一系列的操作符,这些操作符可以链式调用,用于对流中的数据进行转换、过滤、聚合等操作。这些操作符使得在流上执行复杂的逻辑变得简单且直观。

  • 错误处理:Flux 支持错误处理机制,可以在流中出现错误时进行适当的处理,如重试、忽略错误或传播错误。

  • 与其他响应式库的集成:Flux 可以与其他遵循 Reactive Streams 规范的响应式库无缝集成,如 RxJava、Rx.NET 等。

Spring Boot 应用程序中,Flux 通常用于构建响应式 Web 服务。你可以使用 Spring WebFlux基于 Project Reactor)来创建异步的、非阻塞的 Web 控制器,这些控制器可以返回 Flux 或 Mono 对象作为响应。这种模型非常适合处理大量并发请求,特别是在微服务架构中。

WebFlux和sse

FluxSpring Framework 5.0中引入的响应式编程库Project Reactor的核心组件之一。它是一个用于表示异步、非阻塞数据流的类型,你可以在其上应用各种操作符来进行数据转换、过滤和聚合等操作。在Spring Boot应用程序中,Flux通常用于构建响应式Web服务,返回给客户端的是一个数据流而不是一次性的数据包。这使得客户端可以持续接收服务器的更新,而不需要频繁地发起新的请求。

SSE(Server-Sent Events)是另一种在服务器端和客户端之间建立异步通信的机制。与WebSocket不同,SSE是一种单向通信协议,只允许服务器向客户端发送数据。当服务器有新的数据产生时,它会通过SSE连接将数据发送给客户端。SSE基于HTTP协议,这意味着它可以在现有的HTTP基础设施上工作,而不需要额外的配置或协议支持。此外,SSE使用简单的文本格式来表示传输的数据,这使得它在浏览器端的支持非常广泛,除了IE浏览器外,其他现代浏览器都支持SSE

Spring Boot应用程序中,你可以使用WebFlux框架来实现SSE。在WebFlux中,你可以返回一个类型为Flux<ServerSentEvent>的对象来创建一个SSE端点。ServerSentEvent是一个特殊的类型,用于表示服务器发送给客户端的事件。当这个Flux对象发出事件时,它们将作为SSE事件发送给客户端。

总结起来,FluxSSE都是用于在服务器端和客户端之间建立异步通信的技术。Flux是一个更通用的响应式编程库,可以用于构建各种异步场景,而SSE则是一种专门用于服务器向客户端推送事件的机制。在Spring Boot应用程序中,你可以使用Flux来实现SSE端点,从而利用SSE的优势来提供实时数据更新给客户端。

教程

springboot 项目引入webflux依赖

       <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>

创建flux控制器,FluxController 类,一下代码是接入文心一言大模型web流式接口,并使用flux返回给前端。


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;import java.io.IOException;/*** @author tarzan*/
@RestController
@RequestMapping
@Slf4j
public class FluxController {public static final String API_KEY = "";public static final String SECRET_KEY = "";private static final String ACCESS_TOKEN_URL = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=" + API_KEY + "&client_secret=" + SECRET_KEY;private static final String CHAT_URL = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant";@PostMapping(path = "/event-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> helloWorld() throws IOException {/*  return Flux.fromArray("hello world".split(" "))// 每个元素延迟500毫秒,以模拟逐字效果.delayElements(Duration.ofMillis(500));*/// 构造聊天请求参数JSONObject chatPayload = new JSONObject();JSONArray messagesArray = new JSONArray();JSONObject message = new JSONObject();message.put("role", "user");message.put("content", "为什么现在的农村出来的大学生,混的都不好?");messagesArray.put(message);chatPayload.put("messages", messagesArray);chatPayload.put("stream", true);StringBuffer answer=new StringBuffer();return WEB_CLIENT.post().uri("rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token=" + getAccessToken()).contentType(MediaType.APPLICATION_JSON).bodyValue(chatPayload.toString()).retrieve().bodyToFlux(String.class)// 可能需要其他流处理,比如map、filter等.map(data -> {String result=JSON.parseObject(data).getString("result");answer.append(result);return result;}).doOnComplete(() -> {// 当Flux完成时,输出结束消息System.out.println("处理完毕,流已关闭。");System.out.println(answer);});}private static final WebClient WEB_CLIENT = WebClient.builder().baseUrl("https://aip.baidubce.com").defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_STREAM_JSON_VALUE).build();static String getAccessToken() {// 构建请求体String requestBody = "grant_type=client_credentials"+ "&client_id=" + API_KEY+ "&client_secret=" + SECRET_KEY;return WEB_CLIENT.post().uri("/oauth/2.0/token").bodyValue(requestBody).retrieve()// 假设服务器返回的是 JSON 格式的字符串.bodyToMono(String.class).map(data ->JSON.parseObject(data).getString("access_token")).block();}
}

前端代码如下:

<html lang="zh">
<head>
<meta charset="UTF-8"/>
<title>es</title>
<script src="js/jquery-3.1.1.js"></script>
</head>
<noscript><h2 style="color: #e80b0a;">Sorry,浏览器不支持WebSocket</h2></noscript>
<body>
<div>
<p id="data"></p>
</div>
</body>
<script type="text/javascript">$(function() {/*      const eventSource = new EventSource("/event-stream");const dataDiv = $('#data');eventSource.onmessage = function(event) {console.log('Received message:'+event.data);dataDiv.append(event.data);};eventSource.onerror = function(error) {console.error('EventSource failed:'+error);dataDiv.append(error);eventSource.close();};*/const dataDiv = $('#data');// 准备要发送的数据const data = new FormData();data.append('key1', 'value1');data.append('key2', 'value2');// 发送 POST 请求并处理响应fetch('/event-stream', {method: 'POST',body: data}).then(response => {// 检查响应是否为 SSE 流if (response.headers.get('Content-Type') === 'text/event-stream') {// 返回一个可读流以读取响应体return response.body.getReader();} else {throw new Error('Response is not an SSE stream');}}).then(reader => {// 递归处理 SSE 事件的函数function processSSEEvent(reader) {return reader.read().then(({done, value}) => {if (done) {console.log('SSE stream ended');return;}// value 是一个包含数据的 Uint8Arrayconst eventStr = new TextDecoder().decode(value).trim();//  const event = eventStr.trim();// 处理事件(例如,解析为 JSON,如果事件是 JSON 格式)console.log('Received SSE event:', eventStr);const event = eventStr.replace(/^data:/, '');dataDiv.append(event);// 继续读取下一个事件return processSSEEvent(reader);});}// 开始处理 SSE 事件return processSSEEvent(reader);}).catch(error => {console.error('Error fetching SSE stream:', error);})});
</script>
</html>

前端代码接收flux流式返回结果,将响应结果不断输入到前端页面上。代码中注释部分EventSource,只支持get请求,如果是调用get请求的接口可以使用注释的代码,实现flux返回的结果。

补充

上面代码只适合接收纯文本的数据,如果是json数据请修改代码如下:

   let aiType = chatType.val();const data = new FormData();data.append('qa', qa);fetch(`/chat/${user.id}/${aiType}`, {method: 'POST',body: data, // 你的POST数据}).then(response => {if (!response.body || !response.body.getReader) {chatNormal();throw new Error('Response does not support streaming');}const reader = response.body.getReader();let buffer = '';let fullAnswer = '';let isStart = true;return new ReadableStream({start(controller) {function readChunk() {return reader.read().then(async ({done, value}) => {if (done) {controller.close();chatNormal();// 复制功能copy();// 分享功能share();return;}const chunk = new TextDecoder().decode(value);buffer += chunk;// 分割并处理每一行const lines = buffer.split('\n');for (let i = 0; i < lines.length - 1; i++) {const eventStr = lines[i];if (eventStr !== '') {let event = eventStr.replace(/^data:/, '');let res = JSON.parse(event);await printCharacters(res);if (isStart) {fullAnswer = '';isStart = false;}}}buffer = lines[lines.length - 1]; // 保留未完整的一行await readChunk(); // 继续读取下一块数据});}async function printCharacters(res) {let sentence = res.answer;for (let word of sentence.split('')) {await new Promise(resolve => setTimeout(() => {fullAnswer = fullAnswer + word;subscribe(res, fullAnswer);resolve();}, 25));}}readChunk();},});});

以上代码是我接入AI聊天接口,通过event-stream 返回给前端,实现逐字输出的部分代码片段

EventSource简介

EventSourceHTML5中引入的一种新的API,它允许服务器向客户端推送实时事件。这种推送是基于HTTP协议的,并且使用一种特殊的MIME类型,即"text/event-stream",这使得服务器能够发送一系列的事件到客户端。

EventSource主要用途是实现服务器和客户端之间的实时通信。客户端通过创建一个EventSource对象并指定一个URL,就可以开始监听服务器在该URL上发送的事件。一旦服务器有新的事件数据要发送,它就会将这些数据以特定的格式(即"event: data"的形式)发送给客户端。客户端在接收到这些数据后,可以通过注册的事件处理函数来处理这些数据。

EventSource具有一些重要的特性,包括实时性、低延迟和易用性。由于它使用长连接的方式进行数据传输,因此相比于传统的轮询方式,它能够更加高效地传输数据。此外,由于它是基于HTTP协议的,因此它可以在各种不同的场景下使用,并且与WebSocket和长轮询等方式兼容。

总的来说,EventSource是一种非常有用的技术,它使得服务器能够实时地向客户端推送事件,从而提高了Web应用程序的实时性和响应性。

实现效果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2804266.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Grounded-SAM(最强Zero-Shot视觉应用):本地部署及各个模块的全网最详细使用教程!

本篇文章主要对Grounded-SAM项目的部署以及使用进行讲解&#xff0c;目的是使读者可以直接参考文档去使用Grounded-SAM&#xff0c;而无需再去参考Github一步步自己去分析尝试&#xff08;也算是我使用过程中的心得&#xff09;。 对于Grounded-SAM 技术报告的paper阅读可以跳转…

介绍 CI / CD

目录 一、介绍 CI / CD 1、为什么要 CI / CD 方法简介 1、持续集成 2、持续交付 3、持续部署 2、GitLab CI / CD简介 3、GitLab CI / CD 的工作原理 4、基本CI / CD工作流程 5、首次设置 GitLab CI / CD 6、GitLab CI / CD功能集 一、介绍 CI / CD 在本文档中&#x…

echarts 实现x轴文字过长时折行展示

代码如下&#xff1a; this.options {color: ["#0075FF", "#00E2C4", "#FCA884", "#FFCB11"],grid: {top: "25%",bottom: "6%",right: "8%",left: "8%",containLabel: true,},legend: {top…

springboot751社区维修平台

springboot751社区维修平台 获取源码——》公主号&#xff1a;计算机专业毕设大全

TikTok账号注册指南:TikTok的注册方式有哪些?

随着全球数字化浪潮的加速&#xff0c;TikTok已成为跨越文化和国界的社交媒体巨头。对于寻找跨境电商机会的卖家来说&#xff0c;一个有效的TikTok账号都是打开通往成功之门的钥匙。本文将为大家详细介绍TikTok账号的注册方式&#xff0c;并提供一些实用的技巧&#xff0c;帮助…

018—pandas 生成笛卡尔积排列组合合并多列字符串数据

思路&#xff1a; 本需求需要将给定的几列数据&#xff0c;生成一个排列组合形式的数据列&#xff0c;利用到 Pandas 多层索引生成的笛卡尔积的方法。 二、使用步骤 1.引入库 代码如下&#xff08;示例&#xff09;&#xff1a; import pandas as pd2.读入数据 代码如下&…

【动态规划】【回文】【字符串】1147. 段式回文

作者推荐 【广度优先搜索】【网格】【割点】【 推荐】1263. 推箱子 本文涉及知识点 动态规划汇总 LeetCode1147段式回文 你会得到一个字符串 text 。你应该把它分成 k 个子字符串 (subtext1, subtext2&#xff0c;…&#xff0c; subtextk) &#xff0c;要求满足: subtext…

LeetCode104.二叉树的最大深度

题目 给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3思路 计算二叉树的最大深度通常可以使用 递归 来实现。我们可以从根…

第九节HarmonyOS 常用基础组件26-Radio

1、描述 单选框&#xff0c;提供相应的用户交互选择项。 2、接口 Radio(options:{value:string, group:string}) 3、参数 参数名 参数类型 必填 描述 value string 是 当前单选框的值。 group string 是 当前单选框的所属组名称&#xff0c;相同group的Radio只能…

C语言-指针初学速成

1.指针是什么 C语言指针是一种特殊的变量&#xff0c;用于存储内存地址。它可以指向其他变量或者其他数据结构&#xff0c;通过指针可以直接访问或修改存储在指定地址的值。指针可以帮助我们在程序中动态地分配和释放内存&#xff0c;以及进行复杂的数据操作。在C语言中&#…

【算法分析与设计】1的个数

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;算法分析与设计 ⛺️稳中求进&#xff0c;晒太阳 题目 编写一个函数&#xff0c;输入是一个无符号整数&#xff08;以二进制串的形式&#xff09;&#xff0c;返回其二进制表达式中数字位…

钧达股份:光伏跨界新贵只身赴港股,光伏“秩序重塑”?

2月21日&#xff0c;钧达股份终是在“千呼万唤”之中披露最新业绩快报。 快报显示&#xff0c;钧达股份预计2023年经调整后营业收入183.97亿元&#xff0c;同比增长58.65%&#xff0c;归母净利润8.32亿元&#xff0c;同比增长16.00%。 其中&#xff0c;由于Q4完整计提了9.5GW…

洛谷 【算法1-2】排序

【算法1-2】排序 - 题单 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 鄙人不才&#xff0c;刷洛谷&#xff0c;迎蓝桥&#xff0c;【算法1-2】排序 已刷&#xff0c;现将 AC 代码献上&#xff0c;望有助于各位 P1271 选举学生会 【深基9.例1】选举学生会 - 洛谷 题目 解答…

概率密度函数(PDF)与神经网络中的激活函数

原创:项道德(daode3056,daode1212) 在量子力学中&#xff0c;许多现象都是统计的结果&#xff0c;基本上用的是正态分布&#xff0c;然而&#xff0c;从本质上思考&#xff0c;应该还存在低阶的分布&#xff0c;标准的正态分布是它的极限&#xff0c;这样一来&#xff0c;或许在…

《论文阅读》通过识别对话中的情绪原因来提高共情回复的产生 EMNLP 2021

《论文阅读》通过识别对话中的情绪原因来提高共情回复的产生 EMNLP 2021 前言简介方法实现Emotion ReasonerResponse Generator实验结果示例总结前言 亲身阅读感受分享,细节画图解释,再也不用担心看不懂论文啦~ 无抄袭,无复制,纯手工敲击键盘~ 今天为大家带来的是《Improv…

备战蓝桥杯—— 双指针技巧巧答链表1

对于单链表相关的问题&#xff0c;双指针技巧是一种非常广泛且有效的解决方法。以下是一些常见问题以及使用双指针技巧解决&#xff1a; 合并两个有序链表&#xff1a; 使用两个指针分别指向两个链表的头部&#xff0c;逐一比较节点的值&#xff0c;将较小的节点链接到结果链表…

【ECharts】调用接口获取后端数据的四种方法

使用eacharts做大屏&#xff0c;需要使用后端数据&#xff0c;下面的方法是自己试过有效的&#xff0c;有什么不对的&#xff0c;望各位大佬指点。 目录 方法一&#xff1a;在mounted中使用定时器调用eacharts方法&#xff08;定时器可以获取到data中的数据&#xff09; 方法…

图解李白的“朋友圈”

《长安三万里》作为2023年票房第一的国漫电影&#xff0c;以安史之乱为背景&#xff0c;从诗人高适的视角铺设了一幅绚丽的历史长卷&#xff0c;细细讲述“诗仙”李白跌宕起伏的一生&#xff0c;以及大唐盛世一路荣耀幻灭的唏嘘。同时&#xff0c;在这部动画电影中出现了多位大…

CP04大语言模型ChatGLM3-6B特性代码解读(2)

CP04大语言模型ChatGLM3-6B特性代码解读&#xff08;2&#xff09; 文章目录 CP04大语言模型ChatGLM3-6B特性代码解读&#xff08;2&#xff09;构建对话demo_chat.py定义client对象与LLM进行对话 构建工具调用demo_tool.py定义client对象定义工具调用提示词定义main&#xff0…

接口测试需求分析

测试接口的时候&#xff0c;可能很多人都会想&#xff0c;按着研发给的接口协议文档来测&#xff0c;不就好了吗&#xff1f; 其实&#xff0c;对于接口的测试&#xff0c;还需要有点深度的需求分析&#xff0c;然后再进行对应的测试。对于接口测试&#xff0c;这里有个不太详…