基于MQTT协议实现微服务架构事件总线

一、场景描述

昨天在博客《客户端订阅服务端事件的实现方法》中提出了利用websocket、服务端EventEmitter和客户端mitt实现客户端订阅服务端事件,大大简化了客户端对服务端数据实时响应的逻辑。上述方案适用于单服务节点的情形。

对于由服务集群支撑的微服务架构,websocket提供的点对点通信已无法满足前端订阅后端集群事件的需求,升级方案是使用基于消息总线的通信方式。

在这里插入图片描述

二、几种消息总线适用性比较

常用的消息总线包括Kafka、Redis和基于MQTT协议实现的EMQX。

Kafka和MQTT都是从发布/订阅系统演化而来,但发展侧重点不同。Kafka通过分布式架构提供了海量数据流的存储,并保证数据流顺序,它的设计目标是支持数据发布、订阅和存储。而MQTT用于网络中传输小型数据包,其设计目的是实现简单、可靠的设备间通信。

而Redis是从内存数据库系统演化而来,发布/订阅功能是把消息保存在内存中。与Kafka相比,其只能提供半持久化;与MQTT相比,其通信效率较低。

由于应用场景没有对消息持久化的需求,且考虑到产业大脑平台未来会接入工业互联网,使用MQTT协议来搭建事件总线更利于平台在工业互联网环境下的扩展。

三、MQTT简介

几年前,本人曾写过MQTT简介。本文摘抄其中重要概念。

(一)MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于“订阅/发布”模式的轻量级通信协议,该协议基于TCP/IP,能以极低的带宽为海量(百万级)跨域设备提供可靠的消息服务,因此在物联网、小型移动终端、边缘计算方面有广泛应用。
所谓可靠的消息传输,体现为可配置消息的服务质量(QoS),有三种服务质量可选:

  • 至多一次:
    消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。应用场景如环境传感器的数据采集,丢失一次记录无所谓,因为不久后还会有第二次发送。
  • 至少一次:
    确保消息送达订阅者,但消息可能重复,适用于幂等性操作。
  • 只有一次:
    最严格的消息服务质量,确保消息到达且仅到达一次订阅者。应用场景如计费系统等。

MQTT协议中存在三种身份:消息总线(Broker)、发布者(Publish)和订阅者(Subscribe),其中消息总线属于服务器,后两者都属于客户端。发布者和订阅者可以是各种物联网设备和小型终端,消息发布者可以同时也是消息订阅者,如下图所示。

MQTT.png

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

订阅消息时,可以在订阅表达式中使用通配符筛选器对主题进行筛选,可同时订阅所匹配的多个主题。
MQTT协议中主要有以下5个方法:

  • connect:客户端建立与服务器的连接
  • disconnect:等待客户端完成工作后,端口与总线的会话
  • subscribe:客户端向消息总线注册订阅主题
  • unsubscribe:客户端等待消息总线取消所注册的订阅
  • publish:客户端向消息总线发送某主题的消息

(二)开源消息总线EMQX

EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang语言开发的开源物联网MQTT消息总线。其是一款由前华为员工开发的开源软件,软件主页为https://www.emqx.io/。可根据操作系统类别选择不同版本下载安装,或通过docker部署。

软件安装后,通过 emqx start以后台方式启动。启动后将会开放两个端口:

  • 18083端口为控制台端口,可通过浏览器访问该端口,首次登录的用户名和密码为admin和public。控制台提供了总线监控、用户权限管理、在线客户端订阅/发布等功能。
  • 8083端口为通信端口,MQTT客户端可通过该端口与EMQX消息总线通信。

(三)MQTT.js客户端

MQTT.js是MQTT客户端Nodejs SDK,可在浏览器(ES模块)和Node.js环境(CommonJS模块)下使用,前者可通过MQTT over WebSocket使用,后者既可以通过MQTT over WebSocket使用,也可以直接使用MQTT。区别仅仅是连接参数的协议头不同。

1. 安装和帮助文件

$ pnpm i mqtt -S #安装
$ npx mqtt help  #帮助
MQTT.js command line interface, available commands are:* publish     publish a message to the broker* subscribe   subscribe for updates from the broker* version     the current MQTT.js version* help        help about commandsLaunch 'mqtt help [command]' to know more about the commands.

2. 使用方法

// const mqtt = require('mqtt') //ES模块
import mqtt from 'mqtt'  //CommonJS模块// 连接选项
const options = {clean: true, // true: 清除会话, false: 保留会话connectTimeout: 4000, // 超时时间// 认证信息clientId: 'user_id', // 要保证唯一性// 若在控制台配置了用户名和密码:// username: 'xxx',// password: 'xxx',
}// 连接字符串, 通过协议指定使用的连接方式
// ws 未加密 WebSocket 连接
// wss 加密 WebSocket 连接
// mqtt 未加密 TCP 连接
// mqtts 加密 TCP 连接
// wxs 微信小程序连接
// alis 支付宝小程序连接
const connectUrl = 'ws://localhost:8084/mqtt'
const client = mqtt.connect(connectUrl, options)client.on('reconnect', error => {console.error('正在重连:', error)
})client.on('error', error => {console.error('连接失败:', error)
})//收到消息
client.on('message', (topic, message) => {console.log('收到消息:', topic, message.toString()) //message是二进制流,需要转换成字符串
})//订阅主题
const topic='/user_id/#'
const qos=0  //0:最多交付1次;1:至少交付1次;2:只交付1次
client.subscribe(topic, qos, error=>{  //订阅user_id主题下所有消息if(error){console.error('订阅主题失败:', error)return}console.log('订阅成功')
})//发布消息
client.publish('user_id/a',JSON.stringify({a:123}),qos,error=>{if(error){console.error('发布消息失败:', error)}
})//取消订阅
client.unsubscrib(topic, qos, error=>{if(error){console.error('取消订阅失败', error)return}
})//断开连接
if(client.connected){try{client.end(false,()=>{console.log('成功断开连接')})catch(error){console.error('断开连接失败:', error)}}
}

(四)安全性

1. 排它订阅

排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。

2. JWT认证

系统整体采用JWT认证方式,通过一台认证服务器颁发JWT Token。MQTT客户端访问EMQX总线时,携带由认证服务器颁发的JWT Token。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2814666.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Apple的这篇人工智能论文提出了声学模型融合,用以大幅降低语音识别系统中的单词错误率

Apple人工智能论文在提高自动语音识别 (ASR) 系统的准确性和效率方面取得了重大改进。最近的研究深入探讨将外部声学模型 (AM) 集成到端到端 (E2E) ASR 系统中,提出了一种解决域不匹配这一持续挑战的方法,这是语音识别技术中的常见障碍。Apple的这种方法…

【BUG 记录】史诗级 BUG - MYSQL 删库删表却没有备份如何恢复数据

【BUG 记录】史诗级 BUG - MYSQL 删库删表却没有备份如何恢复数据 1. 问题描述2. 解决方案(binlog)2.1 构造测试环境2.2 查看 MySQL 环境是否开启 binlog2.3 查看所有的 binlog 日志记录2.4 查看当前正在使用的是哪一个 binlog 文件2.5 查看此时的 binlo…

springboot整合websocket,入门学习

websocket 1.简介2.常见的消息推送方式2.1轮询方式2.1.1短轮询2.1.2长轮询 2.2 SSE(server-sent event):服务器发送事件2.3 websocket 3.原理解析4.websocket API4.1客户端(浏览器)API4.2服务端API 5.实现1.流程分析2.消息格式3.代码实现 1.简介 websoc…

如何用好应用权限,保护隐私数据?银河麒麟桌面操作系统V10 SP1 2303 update2新功能解析

为您介绍银河麒麟桌面操作系统V10 SP1 2303 update2隐私设置和权限管理功能,为您的个人数据安全保驾护航。 说到个人数据隐私,在科技重塑生活本质的数字世界,个人信息遭受持续威胁。2018年,某国际知名社交平台因安全系统漏洞而遭…

OpenAI划时代大模型——文本生成视频模型Sora作品欣赏(八)

Sora介绍 Sora是一个能以文本描述生成视频的人工智能模型,由美国人工智能研究机构OpenAI开发。 Sora这一名称源于日文“空”(そら sora),即天空之意,以示其无限的创造潜力。其背后的技术是在OpenAI的文本到图像生成模…

docker 容器修改端口和目录映射

容器修改端口映射 一般在运行容器时,我们都会通过参数 -p(使用大写的-P参数则会随机选择宿主机的一个端口进行映射)来指定宿主机和容器端口的映射,例如 docker run -it -d --name [container-name] -p 8088:80 [image-name]这里…

数据结构:循环队列

一、队列的概念 操作受限的线性表,允许在队列的一端执行入队操作,另一端执行出队操作 先进先出(FIFO) 1.顺序队列 物理结构连续,依赖于数组实现 队列中有一个队头指针和队尾指针,队头指针保存每次要出队的元素,队…

网站三合一缩略图片介绍展示源码

网站三合一缩略图片介绍展示源码,PHP源码,运行需要php环境支持,效果截图如下 蓝奏云下载:https://wfr.lanzout.com/ihY8y1pgim6j

Springboot+vue的考务报名平台(有报告)。Javaee项目,springboot vue前后端分离项目。

演示视频: Springbootvue的考务报名平台(有报告)。Javaee项目,springboot vue前后端分离项目。 项目介绍: 本文设计了一个基于Springbootvue的前后端分离的考务报名平台,采用M(model&#xff0…

新加坡服务器托管:开启全球化发展之门

新加坡作为一个小国家,却在全球范围内享有极高的声誉。新加坡作为亚洲的科技中心,拥有先进的通信基础设施和成熟的机房托管市场。除了其独特的地理位置和发达的经济体系外,新加坡还以其开放的商业环境和便利的托管服务吸引着越来越多的国际公…

Python手册(Machine Learning)--LightGBM

Overview LightGBM(Light Gradient Boosting Machine)是一种高效的 Gradient Boosting 算法, 主要用于解决GBDT在海量数据中遇到的问题,以便更好更快的用于工业实践中。 数据结构说明lightgbm.DatasetLightGBM数据集lightgbm.Bo…

【前端素材】推荐优质后台管理系统Salreo平台模板(附源码)

一、需求分析 当我们从多个层次来详细分析后台管理系统时,可以将其功能和定义进一步细分,以便更好地理解其在不同方面的作用和实际运作。 1. 结构层次 在结构层次上,后台管理系统可以分为以下几个部分: a. 辅助功能模块&#…

项目分享|基于ELF 1开发板的车牌识别系统

该项目选用ElfBoard ELF 1开发板作为核心硬件平台,利用USB接口连接的摄像头捕捉并识别车牌信息。一旦车牌成功识别,系统会触发绿灯指示,并将识别所得的车牌号码实时传输至手机APP。车牌识别技术方面,借助了百度提供的OCR&#xff…

Java+SpringBoot+Vue+MySQL:狱内罪犯危险性评估系统全栈开发

✍✍计算机毕业编程指导师 ⭐⭐个人介绍:自己非常喜欢研究技术问题!专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目:有源码或者技术上的问题欢迎在评论区一起讨论交流! ⚡⚡ Java、…

postman访问k8s api

第一种方式: kubectl -n kubesphere-system get sa kubesphere -oyaml apiVersion: v1 kind: ServiceAccount metadata:annotations:meta.helm.sh/release-name: ks-coremeta.helm.sh/release-namespace: kubesphere-systemcreationTimestamp: "2023-07-24T07…

vue2后台管理系统demo,包含增删查改、模糊搜索、分页

因一直敲小程序,vue不熟练,自己练手项目,就包含增删查改以及模糊搜索分页 一、页面简单但功能齐全 二、数据是mock模拟 三、启动步骤 1、 json-server --watch data.json 启动mock数据 2、npm i 下载依赖 3、npm run serve 四、github地址…

最新IE跳转Edge浏览器解决办法(2024.2.26)

最新IE跳转Edge浏览器解决办法(2024.2.26) 1. IE跳转原因1.1. 原先解决办法1.2. 最新解决办法1.3. 最后 1. IE跳转原因 关于IE跳转问题是由于在2023年2月14日,微软正式告别IE浏览器,导致很多使用Windows10系统的电脑在打开IE浏览…

PHP请求示例获取淘宝商品详情数据API接口(按关键词搜索商品列表)

请求示例,API接口接入Anzexi58 item_get-获得淘宝商品详情 taobao.item_get 公共参数 名称类型必须描述keyString是调用key(必须以GET方式拼接在URL中)secretString是调用密钥WeChat18305163218api_nameString是API接口名称(包…

存储卡0字节危机:原因解析与数据拯救之道

存储卡0字节现象揭秘 在数字时代,存储卡作为我们存储重要数据的主要工具之一,一旦遭遇“0字节”的困境,无疑是一场数据灾难。所谓存储卡0字节,即存储卡的文件系统显示容量为0,所有文件仿佛凭空消失,用户无…

Ubuntu服务器fail2ban的使用

作用:限制ssh远程登录,防止被人爆破服务器,封禁登录ip 使用lastb命令可查看到登录失败的用户及ip,无时无刻的不在爆破服务器 目录 一、安装fail2ban 二,配置fail2ban封禁ip的规则 1,进入目录并创建ssh…