【stomp 实战】spring websocket用户消息发送源码分析

这一节,我们学习用户消息是如何发送的。

消息的分类

spring websocket将消息分为两种,一种是给指定的用户发送(用户消息),一种是广播消息,即给所有用户发送消息。那怎么区分这两种消息呢?那就是用前缀了。

用户消息的前缀

  • 不配置的情况下,默认用户消息的前缀是/user
  • 也可以通过下面的方式来配置用户消息
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {/*** stompClient.subscribe("/user/topic/subNewMsg",...)* 这个时候,后端推送消息应该这么写* msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);* 即去掉了/user前缀*/registry.setUserDestinationPrefix(WsConstants.USER_DESTINATION_PREFIX);}
  • 默认情况下,/user是用户消息前缀,那么前端订阅的代码可以这么写
 //订阅用户消息topic1stompClient.subscribe("/user/topic/answer", function (response) {//do something});
  • 后端的发送消息的代码可以这么写,注意,在这里发送的时候,调用的convertAndSendToUser没有带/user前缀
    private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, "/topic/answer", msg);}

广播消息的前缀

  • 广播消息没有默认值,必须显示地指定
  • 配置广播消息的前缀是这么配置,通过/topic或者/queue前缀来订阅的,就是广播消息
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableSimpleBroker("/topic", "/queue")//配置stomp协议里, server返回的心跳.setHeartbeatValue(new long[]{10000L, 10000L})//配置发送心跳的scheduler.setTaskScheduler(new DefaultManagedTaskScheduler());}
  • 前端代码可以这么写
//订阅广播消息topicstompClient.subscribe("/topic/boardCast/hello", function (response) {// do something});
  • 后端代码可以这么写
  private final SimpMessageSendingOperations msgOperations;public void echo2(Msg msg) {log.info("收到的消息为:{}", msg.getContent());msgOperations.convertAndSend("/topic/boardCast/hello", "hello boardCast Message");}

发送用户消息源码分析

用户订阅过程

发送消息,本质上就是从内存中找到注册的用户,通过用户名找到用户会话,在从用户会话中找到该用户的订阅,如果该用户有该订阅,那么就发送消息给前端。

总结一下用户和会话之间的关系,如下图
在这里插入图片描述
如果这块不太熟悉,建议回顾这篇文章,了解一下用户,用户会话,订阅之间的关系:【stomp 实战】Spring websocket 用户订阅和会话的管理源码分析

我们通过Debug来看一下,前端执行用户订阅,经历了哪些过程。
假设,当前登录用户是1001

  stompClient.subscribe("/user/topic/answer", function (response) {//do something});

该用户建立连接,并且绑定1001的用户会话后,执行后端的订阅注册
DefaultSimpUserRegistry响应订阅事件代码如下:
在这里插入图片描述
可以看到,当前的sessionId,destination

在这里插入图片描述
将订阅放到一个subscriptions的map里面。缓存在内存中。

用户消息的发送

后端代码是这么写的,我们来调试一下

    private final SimpMessageSendingOperations msgOperations;public void echo(Principal principal, Msg msg) {msgOperations.convertAndSendToUser(username, "/topic/answer", msg);}

经过层层调用,发现调到了下面的方法
在这里插入图片描述
发现我们的发送目的地变成了这个:this.destinationPrefix + user + destination
通过调试时,发现值如上图所示。
也就是说,我们的发送目的,变成了/user+用户名+我们传的入参/topic/answer
然后再进入下面的代码

//AbstractMessageSendingTemplate@Overridepublic void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,@Nullable MessagePostProcessor postProcessor) throws MessagingException {//对消息进行转换,对象转字符串,或者字节数组之类的Message<?> message = doConvert(payload, headers, postProcessor);//调用Send发送send(destination, message);}

做了两个事:

  • 对消息进行转换,对象转字符串,或者字节数组之类的
  • 调用Send发送

再来看下send方法

	@Overridepublic void send(D destination, Message<?> message) {doSend(destination, message);}

再调用doSend,由子类SimpMessagingTemplate实现。

//SimpMessagingTemplate@Overrideprotected void doSend(String destination, Message<?> message) {Assert.notNull(destination, "Destination must not be null");SimpMessageHeaderAccessor simpAccessor =MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);if (simpAccessor != null) {if (simpAccessor.isMutable()) {simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);simpAccessor.setImmutable();sendInternal(message);return;}else {// Try and keep the original accessor typesimpAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getMutableAccessor(message);initHeaders(simpAccessor);}}else {simpAccessor = SimpMessageHeaderAccessor.wrap(message);initHeaders(simpAccessor);}simpAccessor.setDestination(destination);simpAccessor.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);message = MessageBuilder.createMessage(message.getPayload(), simpAccessor.getMessageHeaders());sendInternal(message);}

其中最关键的是sendInternal

private void sendInternal(Message<?> message) {String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());Assert.notNull(destination, "Destination header required");long timeout = this.sendTimeout;boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));if (!sent) {throw new MessageDeliveryException(message,"Failed to send message to destination '" + destination + "' within timeout: " + timeout);}}

然后再通过messageChannel来发送此条消息。

//AbstractMessageChannel@Overridepublic final boolean send(Message<?> message, long timeout) {Assert.notNull(message, "Message must not be null");Message<?> messageToUse = message;ChannelInterceptorChain chain = new ChannelInterceptorChain();boolean sent = false;try {messageToUse = chain.applyPreSend(messageToUse, this);if (messageToUse == null) {return false;}sent = sendInternal(messageToUse, timeout);chain.applyPostSend(messageToUse, this, sent);chain.triggerAfterSendCompletion(messageToUse, this, sent, null);return sent;}catch (Exception ex) {chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);}catch (Throwable err) {MessageDeliveryException ex2 =new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);throw ex2;}}
  • 构造了一个拦截链,在发送前,可以进行前置处理和后置处理。这个拦截链就是扩展的关键了。我们可以定义自己的拦截器,在发送消息前后进行拦截处理。这里spring给我们的扩展点。
  • 通过sendInternal将消息发送出去

再来看下sendInternal方法,进入子类ExecutorSubscribableChannel

//ExecutorSubscribableChannel@Overridepublic boolean sendInternal(Message<?> message, long timeout) {for (MessageHandler handler : getSubscribers()) {SendTask sendTask = new SendTask(message, handler);if (this.executor == null) {sendTask.run();}else {this.executor.execute(sendTask);}}return true;}

可以看到,通过这个Channel,找到messageHandler,这个messageHandler有多个,依次将消息进行处理。
在这里插入图片描述
这里取到的有两个messageHandler

  • SimpleBrokerMessageHandler
  • UserDestinationMessageHandler

进入SendTask,看一下run方法

//
public void run() {Message<?> message = this.inputMessage;try {message = applyBeforeHandle(message);if (message == null) {return;}this.messageHandler.handleMessage(message);triggerAfterMessageHandled(message, null);}catch (Exception ex) {triggerAfterMessageHandled(message, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;throw new MessageDeliveryException(message, description, ex);}catch (Throwable err) {String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);triggerAfterMessageHandled(message, ex2);throw ex2;}
}

这里的关键点是:this.messageHandler.handleMessage(message);
首先会进入SimpleBrokerMessageHandler的handleMessage
在这里插入图片描述
可以看到,这里直接跳出去了。
SimpleBrokerMessageHandler的作用就是,看是不是我们配置的广播消息的前缀,要满足这个条件,才能发送消息。我们配置的前缀是/topic,/queue,这里destination前缀是/user,所以提前返回,不处理。
然后,我们还有一个UserDestinationMessageHandler会继续处理。

在这里插入图片描述
这里对destination进行了处理,发现生成了一个result对象,这里解析出一个targetDestinations,可以看到我们的destination变成了下面的样子
/topic/answer-usero2zuy4zg

  • 这个的构成实际上就是把/user前缀去掉
  • 然后加上-user,后面加上sessionId,就是当前会话的id
  • 最后再以这个新生成的targetDestination,将消息发送出去!
    在这里插入图片描述

这里的messagingTemplate,就是SimpMessagingTemplate。又会回到上面分析的代码。

  • SimpMessagingTemplate调用messageChannel来发送消息
  • messageChannel中会取得两个messageHandler来处理。
    像不像递归调用?
    不过这一次由于我们的destination已经变成了/topic/answer-usero2zuy4zg。这时候,在进入SimpleBrokerMessageHandler时,情况就不一样了

在这里插入图片描述
由于destination变成了/topic开头的,此时我们不会跳出去,会找到用户(-user后面跟了SessionId)订阅,将消息发送出去

可以看到,我们找到了一个用户订阅。在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

其实是每个用户订阅时,会将/user前缀去掉,将用户的destination改写成了如下形式,
/user/topic/hello->/topic/hello-user{sessionId}
所以,经过UserDestinationMessageHandler处理,改写后的destination可以通过destination找到用户会话,将此消息发送出去。
到此,我们的用户消息的发送就分析完了

总结

发送用户消息的整个过程如下:

  • SimpMessageSendingOperations.convertAndSendToUser接口发送用户消息,这里不传/user前缀,注意一下
  • 接着SimpMessagingTemplate进行消息的发送
  • SimpMessagingTemplate会交由MessageChannel
  • MessageChannel将会调用MessageHandler来处理消息,有以下两个MessageHandler
    • SimpleBrokerMessageHandler
    • UserDestinationMessageHandler
  • 经过MessageHandler的处理,destination由/user/topic/answer,变成了/topic/answer-usero2zuy4zg。
  • 改写后的destination可以找到用户会话,将此消息发送出去

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3019602.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

U盘管控软件,禁止员工用U盘拷贝机密数据,防止信息通过U盘泄露

随着信息技术的不断发展&#xff0c;U盘等便携式存储设备已成为我们日常工作中不可或缺的工具。然而&#xff0c;随着U盘的普及&#xff0c;企业面临的信息泄露风险也在不断增加。为了确保企业的信息安全&#xff0c;许多企业开始采用U盘管控软件&#xff0c;禁止员工使用U盘拷…

内容自动化的进阶之路:Kompas.ai带你走进智能创作时代

在数字化媒体的浪潮中&#xff0c;内容创作和管理正变得越来越复杂和挑战性。为了应对这一挑战&#xff0c;内容自动化技术应运而生&#xff0c;它通过使用人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09;算法&#xff0c;自动化内容创作的多个环节&a…

ubuntu安装mysql本地navicat连接使用

ubuntu安装mysql&#xff0c;选择在线安装非常快&#xff1a; 安装 sudo apt install -y mysql-server-8.0先下载资源&#xff08;指定版本下载&#xff09; 如果下不下来&#xff0c;遇到报错多半是 工具需要更新了 sudo apt update更新一下即可&#xff08;sudo就是权限更…

静态分析-RIPS-源码解析记录-02

这部分主要分析scanner.php的逻辑&#xff0c;在token流重构完成后&#xff0c;此时ini_get是否包含auto_prepend_file或者auto_append_file 取出的文件路径将和tokens数组结合&#xff0c;每一个文件都为一个包含require文件名的token数组 接着回到main.php中&#xff0c;此时…

【Linux网络编程】4.TCP协议、select多路IO转换

目录 TCP协议 TCP通讯时序 三次握手 四次挥手 滑动窗口 测试代码1 测试结果 Address already in use解决方法 批量杀进程 测试代码2 测试结果 测试代码4 测试结果 TCP状态转换 主动发起连接请求端 主动关闭连接请求端 被动接收连接请求端 被动关闭连接请求端…

使用Docker安装MySQL5.7.36

拉取镜像并查看 docker pull mysql:5.7.36拉取成功后查看&#xff08;非必须&#xff09; docker images创建并设置宿主机 mysql 配置文件目录和数据文件目录 创建相关文件夹将容器中的mysql数据保存到本地&#xff0c;这样即使容器被删除&#xff0c;数据也不会丢失。 mkd…

【微积分听课笔记】全微分,二元极值,Double Integral

6.6 二元函数的极值_哔哩哔哩_bilibili 此笔记为听课笔记&#xff0c;宋浩老师微积分~ 最近诸事缠身&#xff0c;会有种会不会只做一件事好些。实际上&#xff0c;关键在于动力&#xff0c;我不可能每次都准备充分。动力&#xff0c;分配&#xff0c;这是目前进入大学我正在学…

中仕公考:2024年甘肃兰州事业单位招聘公告

兰州市7个县区和市属48家事业单位计划面向社会公开招聘工作人员536名(管理岗位82名&#xff0c;专业技术岗位447名&#xff0c;工勤技能岗位7名)。 应聘人员年龄要求18周岁以上(2006年5月7日以前出生) 报名&#xff1a;2024年5月11日至5月15日(上午9:00至下午17:00) 资格审查…

第9章 负载均衡集群日常维护

一个设计良好的高可用负载均衡集群&#xff0c;交付使用以后并不能一劳永逸。欲使其高效、稳定、持续对外服务&#xff0c;日常维护必不可少。 对于高可用负载均衡集群来说&#xff0c;有两种类型的维护形式&#xff1a;常规性维护与突发性维护。突发性维护一般指故障处理&…

【每日刷题】Day34

【每日刷题】Day34 &#x1f955;个人主页&#xff1a;开敲&#x1f349; &#x1f525;所属专栏&#xff1a;每日刷题&#x1f34d; &#x1f33c;文章目录&#x1f33c; 1. 1047. 删除字符串中的所有相邻重复项 - 力扣&#xff08;LeetCode&#xff09; 2. 1475. 商品折扣后…

俄罗斯副总理暗示欧佩克+或增加原油产量,亚洲早盘油价小幅下跌

在俄罗斯副总理亚历山大诺瓦克暗示欧佩克可能采取行动增加原油产量后&#xff0c;亚洲早盘的油价出现小幅下跌。这一消息引起了市场对原油供给增加的担忧&#xff0c;导致油价走低。 City Index和FOREX.com的市场分析师Fawad Razaqzada表示&#xff0c;虽然原油价格在技术上尚…

JAVA(三)常用类和API

目录 常用类与基础API---String String的内存结构 构造器和常用方法 字符串构建 String与其他结构间的转换 String的常用API 系列1&#xff1a;常用方法 系列2&#xff1a;查找 系列3&#xff1a;字符串截取 系列4&#xff1a;和字符/字符数组相关 系列5&#xff1a;开头…

vitis 2020.1 Up date XSA文件后,编译不通过

原来是可以编译通过的&#xff0c;升级XSA文件后&#xff0c;出现各种问题&#xff0c;pmufw没法编译通过 xpfw_config.h:14:10: fatal error: xparameters.h: No such file or directory Vitis 2020.2 - fatal error: xparameters.h: No such file or directory (xilinx.com)…

vscode与git下载安装

粉丝不过W git下载地址: https://git-scm.com/downloads, 安装git时, 记住你安装Git的路径 vscode下载地址: https://code.visualstudio.com/ 下载完后, 并默认安装好, 你就可以进入配置git的环境变量了, 点击win, 点击设置 在搜索框里搜索, 高级系统设置 点到 高级 , 然后点击…

打造文旅客运标杆!吐鲁番国宾旅汽携苏州金龙升级国宾级出行体验

新疆&#xff0c;这片神秘的大地&#xff0c;从无垠沙漠到高耸天山&#xff0c;从古老丝路到繁华都市&#xff0c;处处都散发着独特的魅力&#xff0c;吸引着四面八方的游客。据新疆维吾尔自治区文化和旅游厅数据显示&#xff0c;刚刚过去的“五一”小长假&#xff0c;新疆全区…

开放原子龙蜥社区 2 大学习赛首批获奖者名单公布

近日&#xff0c;开放原子开源基金会联合龙蜥社区推出「人人都可以参与开源」和「基于 ECS Intel 实例部署 GPT-2 大语言模型」两大学习赛&#xff0c;此赛题长期有效&#xff0c;且有开放原子开源基金会和龙蜥社区共同提供的丰厚双重奖励机制。赛题一经发布&#xff0c;吸引了…

线程池(一)

1.线程池的基本概念 1.1 什么是线程池&#xff1a; 线程池是一种利用池化技术思想来实现的线程管理技术&#xff0c;主要是为了复用线程、便利地管理线程和任务、并将线程的创建和任务的执行解耦开来。我们可以创建线程池来复用已经创建的线程来降低频繁创建和销毁线程所带来的…

EIA预测2024年全球石油市场供求平衡,非欧佩克产油国将抵消欧佩克减产影响

美国能源信息署&#xff08;EIA&#xff09;预测&#xff0c;2024年全球石油市场将达到供求平衡状态&#xff0c;主要原因是非欧佩克国家的原油产量增长将抵消欧佩克近期的减产。根据EIA周二公布的短期能源展望报告&#xff0c;全球原油供应量预计将增加至1.0276亿桶/日。而与此…

Linux学习之高级IO

之前的内容我们基本掌握了基础IO&#xff0c;如套接字&#xff0c;文件描述符&#xff0c;重定向&#xff0c;缓冲区等知识都是文的基本认识&#xff0c;而高级IO则是指更加高效的IO。 对于应用层&#xff0c;在读写的时候&#xff0c;本质就是把数据写给OS&#xff0c;若一方…

一季度盈利大增65.62%,神州泰岳游戏表现抢眼

易采游戏网5月8日消息&#xff0c;近日国内知名游戏上市公司神州泰岳公布了其2023年一季度的财务报告&#xff0c;报告显示&#xff0c;公司一季度盈利大增65.62%&#xff0c;这一数字远超过市场预期&#xff0c;引发了业界的广泛关注。 神州泰岳此次盈利大增&#xff0c;主要得…