SSE(Server Sent Event)实战(3)- Spring Web Flux 实现

上篇博客 SSE(Server Sent Event)实战(2)- Spring MVC 实现,我们用 Spring MVC 实现了简单的消息推送,并且留下了两个问题,这篇博客,我们用 Spring Web Flux 实现,并且看看这两个问题怎么解决。

一、服务端实现

/** XingPan.com* Copyright (C) 2021-2024 All Rights Reserved.*/
package com.sse.demo2.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.time.Duration;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author liuyuan* @version SseController.java, v 0.1 2024-07-15 14:24*/
@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {private static final HttpClient HTTP_CLIENT = HttpClient.create().responseTimeout(Duration.ofSeconds(5));private final Map<String, FluxSink<String>> USER_CONNECTIONS = new ConcurrentHashMap<>();/*** 用来存储用户和本机地址,实际生成请用 redis*/private final Map<String, String> USER_CLIENT = new ConcurrentHashMap<>();/*** 创建连接*/@GetMapping(value = "/create-connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> createConnect(@RequestParam("userId") String userId) {// 获取本机地址String hostAddress = this.getHostAddress();Flux<String> businessData = Flux.create(sink -> {USER_CONNECTIONS.put(userId, sink);USER_CLIENT.put(userId, hostAddress);log.info("创建了用户[{}]的SSE连接", userId);sink.onDispose(() -> {USER_CONNECTIONS.remove(userId);USER_CLIENT.remove(userId);log.info("移除用户[{}]的SSE连接", userId);});});// 创建心跳Flux<String> heartbeat = Flux.interval(Duration.ofMinutes(1)).map(tick -> "data: heartbeat\n\n");return Flux.merge(businessData, heartbeat);}/*** 发送消息 gateway*/@GetMapping("/send-message-gateway")public Mono<RpcResult<Boolean>> sendMessageGateway(@RequestParam("userId") String userId, @RequestParam("message") String message) {String userHostAddress = USER_CLIENT.get(userId);if (userHostAddress == null) {log.info("用户[{}]的SSE连接不存在,无法发送消息", userId);return Mono.just(RpcResult.error("10001", "SSE连接不存在,无法发送消息"));}// 获取本机地址和用户连接地址比较,如果相同,直接使用localhost发消息String hostAddress = this.getHostAddress();userHostAddress = userHostAddress.equals(hostAddress) ? "localhost" : userHostAddress;String baseUrl = "http://" + userHostAddress + ":8080";log.info("发送消息 > baseUrl = {}", baseUrl);WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HTTP_CLIENT)).baseUrl(baseUrl).build();RpcResult<Boolean> errorResult = RpcResult.error("10002", "消息发送失败");return webClient.get().uri("/sse/send-message?userId={userId}&message={message}", userId, message).exchangeToMono(clientResponse -> {if (clientResponse.statusCode().is2xxSuccessful()) {log.info("消息发送成功 > 用户 = {},消息内容 = {}", userId, message);return Mono.just(RpcResult.success(true));} else {log.error("消息发送失败 > 状态码 = {},用户 = {},消息内容 = {}", clientResponse.statusCode().value(), userId, message);return Mono.just(errorResult);}}).onErrorResume(error -> {log.error("消息发送失败 > 用户 = {}, 消息内容 = {}, e = ", userId, message, error);return Mono.just(errorResult);});}/*** 发送消息*/@GetMapping("/send-message")public Mono<Void> sendMessage(@RequestParam("userId") String userId, @RequestParam("message") String message) {FluxSink<String> sink = USER_CONNECTIONS.get(userId);if (sink != null) {try {sink.next(message);log.info("给用户[{}]发送消息成功: {}", userId, message);} catch (Exception e) {log.error("向用户[{}]发送消息失败,sink可能已关闭或无效", userId, e);USER_CONNECTIONS.remove(userId);USER_CLIENT.remove(userId);}} else {log.info("用户[{}]的SSE连接不存在或已关闭,无法发送消息", userId);}return Mono.empty();}private String getHostAddress() {String hostAddress = "localhost";try {Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();while (networkInterfaces.hasMoreElements()) {NetworkInterface networkInterface = networkInterfaces.nextElement();Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();while (inetAddresses.hasMoreElements()) {InetAddress inetAddress = inetAddresses.nextElement();if (!inetAddress.isLoopbackAddress() && !inetAddress.getHostAddress().contains(":") && inetAddress.getHostAddress().startsWith("10.")) {hostAddress = inetAddress.getHostAddress();}}}} catch (SocketException e) {log.error("获取主机地址失败", e);}log.info("获取主机地址 > hostAddress = {}", hostAddress);return hostAddress;}
}
  1. 如果我们服务设置了最大连接时间,比如 3 分钟,而服务端又长时间没有消息推送给客户端,导致长连接被关闭该怎么办?

在创建连接时/create-connect,增加心跳,只要心跳频率小于超时时间,基本就可以解决这个问题,但是前端要注意隐藏心跳内容。

  1. 实际生产环境,我们肯定是多个实例部署,那么怎么保证创建连接和发送消息是在同一个实例完成?如果不是一个实例,就意味着用户没有建立连接,消息肯定发送失败。

a. 将用户id 和用户请求的实例 ip 绑定,我这里用的是Map(USER_CLIENT)存储,生产请换成分布式缓存;
b. 服务端发送消息使用/send-message-gateway接口,这个接口只做消息分发,不真实发送消息。从USER_CLIENT中获取用户所在的实例,然后将请求分发到具体实例;
c. /send-message-gateway将请求打到/send-message,然后给用户推送消息;

二、客户端实现


<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE Demo</title><script>        document.addEventListener('DOMContentLoaded', function () {var userId = "1";// 创建一个新的EventSource对象var source = new EventSource('http://localhost:8080/sse/create-connect?userId=' + userId);// 当连接打开时触发source.onopen = function (event) {console.log('SSE连接已打开');};// 当从服务器接收到消息时触发source.onmessage = function (event) {// event.data 包含服务器发送的文本数据console.log('接收到消息:', event.data);// 在页面上显示消息var messagesDiv = document.getElementById('messages');if (messagesDiv) {messagesDiv.innerHTML += '<p>' + event.data + '</p>'; // 直接使用event.data} else {console.error('未找到消息容器元素');}};// 当发生错误时触发source.onerror = function (event) {console.error('SSE连接错误:', event);};});</script>
</head>
<body>
<div id="messages"><!-- 这里将显示接收到的消息 -->
</div>
</body>
</html>

三、启动项目

  1. 运行 Spring 项目
  2. 浏览器打开 index.html文件
  3. 调用发送消息接口
    curl http://localhost:8080/sse/send-message-gateway?userId=1&message=test0001
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3246983.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

【北京迅为】《i.MX8MM嵌入式Linux开发指南》-第二篇 Linux系统编程篇-第三十一章 文件IO和标准IO

i.MX8MM处理器采用了先进的14LPCFinFET工艺&#xff0c;提供更快的速度和更高的电源效率;四核Cortex-A53&#xff0c;单核Cortex-M4&#xff0c;多达五个内核 &#xff0c;主频高达1.8GHz&#xff0c;2G DDR4内存、8G EMMC存储。千兆工业级以太网、MIPI-DSI、USB HOST、WIFI/BT…

进程的优先级与命令行参数

一、进程的优先级 1、理解 制定一个进程获取某种资源的先后顺序。 在Linux中数字越小优先级越高。 2、优先级的意义 进程访问资源始终是有限的&#xff0c;系统中的进程都是很多的。 操作系统关于调度和优先级规则 分时操作系统&#xff0c;基本公平。如果进程长时间不被…

Study--Oracle-07-ASM自动存储管理(二)

一、ASM安装准备条件 1、ASM支持存储类型 本地祼设备(本地的磁盘和分区) 网络附加存储(NAS) 存储区域网络(SAN) 2、ASM使用本地裸设备,要点: 已经被挂载到操作系统上或者已经做了分区 映射裸设备为文件名 设置正确的权限(针对grid用户和asmadmin组,权限为660) 二、OR…

玄机-第一章 应急响应-webshell查杀

文章目录 前言简介应急开始准备工作步骤 1步骤 2步骤 3步骤 4 总结 前言 作者是个垃圾&#xff0c;第一次玩玄机太紧张了&#xff0c;不知道flag是啥样找了半天&#xff0c;第二次开靶机多次尝试才知道格式。争取下次一次过。 简介 靶机账号密码 root xjwebshell 1.黑客webshel…

网络割接方案通用模板

第一章 项目概述 1.1 编写目的 为规范“十三五”以来&#xff0c;随着移动互联新技术的发展以及我国政府职能的不断转变&#xff0c; 我国的政法网络进入新的发展阶段&#xff0c;跨地域、跨部门、跨系统的信息共享、业务 协同以及智慧政务等成为了各地电子政务的重点建设内容。…

【数学建模】——多领域资源优化中的创新应用-六大经典问题解答

目录 题目1&#xff1a;截取条材 题目 1.1问题描述 1.2 数学模型 1.3 求解 1.4 解答 题目2&#xff1a;商店进货销售计划 题目 2.1 问题描述 2.2 数学模型 2.3 求解 2.4 解答 题目3&#xff1a;货船装载问题 题目 3.1问题重述 3.2 数学模型 3.3 求解 3.4 解…

MATLAB-Simulink模型加密

1、封装子系统 假如已经是子系统&#xff0c;需要确认子系统是否为虚拟子系统如果是虚拟子系统需要将虚拟子系统设置为非虚拟子系统也就是原子子系统。 右击子系统模块&#xff0c;选择Block Parameter->勾选Treat as atomic unit 设置仿真步长 2、将设置好的子系统转换为…

【Outlook】从Outlook新版回归经典版全攻略

引言 在微软宣布计划于2024年底淘汰邮件应用&#xff08;Mail app&#xff09;之后&#xff0c;许多用户发现新版Outlook应用&#xff08;Outlook (new)&#xff09;在他们的Windows 11/10系统上自动启动。如果您更倾向于使用经典版Outlook&#xff08;Outlook (classic)&…

实验丰富、原创改进!|多策略改进蜣螂优化算法(MATLAB)

本文内容来源于本人公众号&#xff1a;KAU的云实验台&#xff0c;更新内容&#xff1a;智能优化算法及其改进应用。 本文核心内容&#xff1a; 新颖的多策略改进蜣螂优化算法 对比算法包括&#xff1a;高引用/新发布/经典/其他DBO变体&#xff08;共11种&#xff09; 实验设计…

Go语言中GC(垃圾回收回收机制)三色标记与混合写屏障

5、Golang三色标记混合写屏障GC模式全分析 (yuque.com) 第1讲-课程目标_哔哩哔哩_bilibili Golang三色标记GC混合写屏障 Go V1.3之前的标记清除&#xff08;mark and sweep) 垃圾回收、内存管理、自动适放、三色标记法、STW (stop the world) 图的遍历&#xff1f;可达性分…

ArgMed-Agents:通过多个智能体论证方案增强大模型,进行可解释的临床决策推理

ArgMed-Agents&#xff1a;通过多个智能体论证方案增强大模型&#xff0c;进行可解释的临床决策推理 提出背景ArgMed-Agents 框架目的解法拆解逻辑链 临床讨论的论证方案&#xff08;ASCD&#xff09;论证方案用于决策&#xff08;ASDM&#xff09;论证方案用于副作用&#xff…

python课设——宾馆管理系统

python课设——宾馆管理系统 数据库课设-宾馆管理系统-python3.7pyqt5 简介 大二数据库课程设计&#xff08;3-4天工作量&#xff09;的项目&#xff0c;登录界面的ui设计参考了他人成果&#xff0c;其余ui以及所有后端部分全部独立完成&#xff0c;详细功能见功能模块图使用…

【ffmpeg命令】制作一个属于你自己的动图表情包

文章目录 前言如何制作一个动态表情包制作动图表情包转换分辨率减少帧率截取主要内容转换为gif动图去除水印 最终结果总结 前言 在数字时代&#xff0c;动图表情包已经成为我们日常交流的重要组成部分。它们富有表现力&#xff0c;能够传达出我们无法用语言表述的情感和信息。…

LangChain-v0.2 Build an Agent 构建代理

语言模型本身不能采取行动&#xff0c;它们只是输出文本。LangChain的一个重要用例是创建代理。代理是使用LLM作为推理引擎来确定要采取哪些行动&#xff0c;以及传递哪些输入的系统。执行操作后&#xff0c;可以将结果反馈到LLM中&#xff0c;以确定是否需要更多操作&#xff…

博客前端项目学习day01

这里写自定义目录标题 登录创建项目配置环境变量&#xff0c;方便使用登录页面验证码登陆表单 在VScode上写前端&#xff0c;采用vue3。 登录 创建项目 检查node版本 node -v 创建一个新的项目 npm init vitelatest blog-front-admin 中间会弹出询问是否要安装包&#xff0c…

docker容器重启错误解决方案

目录 起因解决方案重启 起因 是这样的&#xff0c;今天客户服务器的服务器突然断电了&#xff0c;原本是配置了自启动的项目&#xff0c;在重启之后发现还是无法登录&#xff0c;然后又看了一眼工控机&#xff0c;欸&#xff0c;这边居然可以&#xff0c;那么问题就直接排除了…

大样本 OLS 模型及 Stata 具体操作步骤

目录 一、引言 二、理论原理 三、小样本 OLS 和大样本 OLS 的区别 四、数据准备 五、程序代码及解释 六、代码运行结果 一、引言 在统计学和计量经济学中&#xff0c;普通最小二乘法&#xff08;Ordinary Least Squares&#xff0c;OLS&#xff09;是一种广泛应用的线性回…

QT-RTSP相机监控视频流

QT-RTSP相机监控视频流 一、演示效果二、关键程序三、下载链接 一、演示效果 二、关键程序 #include "mainwindow.h"#include <QDebug>MainWindow::MainWindow(QWidget *parent) : QMainWindow(parent), m_settings("outSmart", "LiveWatcher&…

为什么品牌需要做 IP 形象?

品牌做IP形象的原因有多方面&#xff0c;这些原因共同构成了IP形象在品牌建设中的重要性和价值&#xff0c;主要原因有以下几个方面&#xff1a; 增强品牌识别度与记忆点&#xff1a; IP形象作为品牌的视觉符号&#xff0c;具有独特性和辨识性&#xff0c;能够在消费者心中留…

CSA笔记2-文件管理命令

tree 以树状图显示多级目录 示例&#xff1a; [rootlocalhost ~]# tree haha/ haha/ └── 111 └── 222 2 directories, 0 files [rootlocalhost ~]# tree -L 1 haha/haha/ └── 111 echo > >> < << 示例&#xff1a; [rootxxx ~]#…