【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统

用Redis实现消息队列并搭建分布式邮件消息系统

  • 系统介绍
  • Redis实现消息队列
    • 思路分析
    • 代码实现
  • MongoDB监听数据变化
    • 思路分析
    • 代码实现
      • Mongoose测试连接
      • 监听mongodb数据变化
  • 注意点

系统介绍

本次要实现的是一个能够实现实时监控Mongodb中数据变化的系统,要能够在数据发生变动的时候实时将变动消息发送给指定的邮箱。

  • Node.js:用于开发的语言,既能用于前端开发,又能用来做后端开发。
  • Redis:用于搭建消息队列,实现消息的分布式。
  • MongoDB:持久化数据,同时实现触发条件的监听,当MongoDB中有新增数据的时候发送新增数据的邮件消息。

Redis实现消息队列

思路分析

主要使用的就是Redis-smq这个库,下面展示的就是主要使用的消息队列类,其中包括了很多队列种类,有先进先出、优先级先出等方式。
在这里插入图片描述
整个库的原理如下结构图,本次使用到的只有主线,就是发送和接收:
在这里插入图片描述

代码实现

const { transemail } = require('../email_list/email.js');
const redis = require('promise-redis-client');
const redisHost = 'localhost';
const redisPort = 6379;// 配置 Redis 客户端
const createRedisClient = () => {return new Promise((resolve, reject) => {let client = redis.createClient({ host: redisHost, port: redisPort });client.on('error', err => {console.log('Redis 连接出错');reject(err);});client.on('ready', () => {console.log('Redis ready');resolve(client);});});
};async function startWaitMsg(redisClient) {while (true) {let res = null;try {res = await redisClient.brpop('bookChanges', 0);console.log('收到消息', res);} catch (err) {console.log('brpop 出错,重新 brpop');continue;}res = res.toString();transemail(res);}
}async function listenredis() {try {// 启动生产者// startProducer();// 创建 Redis 客户端const redisClient = await createRedisClient();// 启动消息监听startWaitMsg(redisClient);} catch (error) {console.error('Error:', error);}
}
//测试的时候使用的代码
listenredis().catch(console.error);// 处理退出信号以关闭客户端
process.on('SIGINT', async () => {console.log('Closing clients...');process.exit(0);
});

MongoDB监听数据变化

思路分析

由于要实现实时检测,经过分析以后使用mongoose中的数据流监控最为合适,但是要实现这个方法需要用到watch方法,这个方法只有在mongodb有副本集的时候才能使用,因此还需要提前配置好mongodb才能进行这里下一步的操作,如果没有配置过mongodb的副本集的可以参考我的这篇博客。

  1. 用mongoose中的watch连接mongodb副本集数据库获取数据变化。
  2. 将数据变化发送到redis消息队列中。

首先在命令行中将服务启动:
在这里插入图片描述

代码实现

Mongoose测试连接

const mongoose = require('mongoose');mongoose.connect('mongodb://localhost/test', {useNewUrlParser: true,useUnifiedTopology: true
}).then(() => {console.log('Successfully connected to MongoDB');const bookSchema = new mongoose.Schema({title: String,author: String});const Book = mongoose.model('Book', bookSchema);const bookChangeStream = Book.watch();bookChangeStream.on('change', (change) => {console.log('Collection changed:', change);if (change.operationType === 'insert') {console.log('New book added:', change.fullDocument);}});
}).catch((error) => {console.log('Error connecting to MongoDB:', error);
});

在这里插入图片描述
测试结果:
在Mongo Campass中添加数据以后,在终端中出现如下消息:
在这里插入图片描述
证明测试成功,可以进行下一步操作啦!

监听mongodb数据变化

const redis = require('redis');
const mongoose = require('mongoose');
// 创建 Redis 客户端
const redisClient = redis.createClient({host: 'localhost',port: 6379});// 连接到 Redis
redisClient.connect();//连接mongodb数据库并检测变化发送到redis消息队列
async function connectAndMonitorMongoDB(redisClient) {try {await mongoose.connect('mongodb://localhost/test', {useNewUrlParser: true,useUnifiedTopology: true});console.log('Successfully connected to MongoDB');const bookSchema = new mongoose.Schema({title: String,author: String});const Book = mongoose.model('Book', bookSchema);const bookChangeStream = Book.watch();try{bookChangeStream.on('change', (change) => {console.log('Collection changed:', change);console.log("type of change:",typeof(change));msg = JSON.stringify(change.fullDocument);msg = msg.replace(/{|}/g, '');msg = "New message received:"+msg;console.log("massage:",msg);console.log("type of message:",typeof(msg));if (change.operationType === 'insert') {console.log('New book added:', msg);redisClient.lPush('bookChanges', msg, function(err, reply) {if (err) {console.log('Error storing JSON to Redis:', err);} else {console.log('JSON stored successfully, list length:', reply);}})}});}catch (err){console.log("error while loading data into redis:", err)}} catch (error) {console.log('Error connecting to MongoDB:', error);}
}// module.exports = { connectAndMonitorMongoDB };
async function main() {try {await connectAndMonitorMongoDB(redisClient);console.log('Monitoring MongoDB changes...');} catch (error) {console.error('Failed to start monitoring:', error);}
}main();

注意点

在nodejs中将JSON对象转换成字符串的JSON.Stringify函数并不是严格的转换成字符串而是带有一个大括号,然而这个在进行redis进队列的时候会有问题,因此需要用正则表达式去掉大括号:

msg = JSON.stringify(change.fullDocument);
msg = msg.replace(/{|}/g, '');
msg = "New message received:"+msg;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3245728.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

VGMShield:揭秘视频生成模型滥用的检测与追踪技术

人工智能咨询培训老师叶梓 转载标明出处 视频生成模型,如 Stable Video Diffusion 和 Videocrafter,已经能够生成合理且高分辨率的视频。但这些技术进步也带来了被恶意利用的风险,比如用于制造假新闻或进行政治宣传。因此,来自弗…

彩电上自带的推箱子游戏是什么编程语言开发的?

2000年左右的厦新彩电上,自带了推箱子、华容道游戏。界面如下: 在线版推箱子游戏,网址:https://www.tuixiangzi.cn/ BASIC,全称是Beginners All-purpose Symbolic Instruction Code,含义是初学者通用符号…

【杰理蓝牙开发】AC695x 按键扫描接口分析

【杰理蓝牙开发】AC695x 按键ADC接口分析 0. 个人简介 && 授权须知1. 按键扫描配置和按键消息处理1.1 参数说明1.2 按键事件说明2. 应用层处理3. 特殊按键需求3.1 特殊需求 1:组合键3.2 特殊需求 2:按键多击事件3.3 特殊需求 3:某些按键只响应单击事件0. 个人简介 &…

AI算法20-分位数回归算法Quantile Regression | QR

分位数回归算法的概念 分位数回归算法简介 分位数回归(Quantile Regression)是一种统计方法,最早由Roger Koenker和Gilbert Bassett于1978年提出。它通过估计条件分位数函数来分析自变量与因变量之间的关系,与传统的最小二乘回归…

怎么压缩视频文件?简单的压缩视频方法分享

视频已成为我们日常生活中不可或缺的一部分。但随着视频质量的提高,文件大小也逐渐成为我们分享的阻碍。如何有效压缩视频文件,使其既能保持清晰,又能轻松分享?今天,给大家分享五种实用的视频压缩方法,快来…

昇思25天学习打卡营第02天|张量 Tensor

一、什么是张量 Tensor 张量是一种特殊的数据结构,与数组和矩阵非常相似。张量(Tensor)是MindSpore网络运算中的基本数据结构。 张量可以被看作是一个多维数组,但它比普通的数组更加灵活和强大,因为它支持在GPU等加速…

项目JetCache的常见配置与使用

Hello, 大家好,今天本汪给大家带来的是JetCache在项目中的常见配置与用法讲解,接下来,随本汪一起来看看吧 一、介绍 官网地址:https://github.com/alibaba/jetcache JetCache 是一种 Java 缓存抽象,它为不同的缓存…

腾讯PAG动效工具解析

什么是PAG? 1、背景 在终端 APP 中,动画非常常见,它可以辅助视觉制造焦点,同时也可以让用户交互更加顺滑,但动画的实现却是设计师和研发群体的一个痛点。如何辅助设计师设计高性能炫酷的动画、如何将设计师设计的动画…

自托管端口管理系统Portall

老苏一直在折腾各种开源软件,但总是记不清哪些应用占用了哪些端口,每次都是先随机想一个端口,然后在笔记中搜索,看有没有被占用过。Portall 就是用来解决老苏遇到的这种情况的,当然,excel 也是可以的 &…

十分钟“手撕”七大排序

前言:可以通过目录来找你需要的排序的源代码。先是解释底层原理,后附带代码。 目录 稳定的概念 一、插入排序 二、希尔排序 三、选择排序 四、堆排序 五、冒泡排序 六、快速排序 七、归并排序 八、排序总结 额外:计数排序 稳定的…

Qt MV架构-委托类

一、基本概念 与MVC模式不同,MV视图架构中没有包含一个完全分离的组件来处理与用户的交互。 一般地,视图用来将模型中的数据显示给用户,也用来处理用户的输入。为了获得更高的灵活性,交互可以由委托来执行。 这些组件提供了输入…

gradle学习及问题

一、下载安装 参考:https://blog.csdn.net/chentian114/article/details/123344839 1、下载Gradle并解压 安装包:gradle-6.7-bin.zip 可以在idea的安装目录查看自己适配的版本 路径:D:\IDEA2021.3\plugins\gradle\lib 下载地址&#xff1a…

16_网络IPC2-寻址

进程标识 字节序 采用大小模式对数据进行存放的主要区别在于在存放的字节顺序,大端方式将高位存放在低地址,小端方式将高位存放在高地址。 采用大端方式进行数据存放符合人类的正常思维,而采用小端方式进行数据存放利于计算机处理。到目前…

python用selenium网页模拟时xpath无法定位元素解决方法2

有时我们在使用python selenium xpath时,无法定位元素,红字显示no such element。上一篇文章写了1种情况,是包含iframe的,详见https://blog.csdn.net/Sixth5/article/details/140342929。 本篇写第2种情况,就是xpath定…

Linux 线程初步解析

1.线程概念 在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列。在linux中,由于线程和进程都具有id,都需要调度等等相似性,因此都可以用PCB来描述和控制,线程含有PCB&am…

人类或是低等生物?

自工业革命以来,人类对自然资源的消耗日益加剧,引发了对未来可持续性的深刻担忧。然而,一项振奋人心的发现为人类提供了新的希望——一颗名为LHS 1140 b的超级地球,它位于距离地球约48光年的鲸鱼座,由詹姆斯韦布空间望…

uniapp字符串转base64,无需导入依赖(多端支持)

使用示例 import { Base64Encode, Base64Decode } from "@/utils/base64.js" base64.js const _keyStr = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";export const Base64Encode = (text)

Qt Creator的好用的功能

(1)ctrlf: 在当前文档进行查询操作 (2)f3: 找到后,按f3,查找下一个 (3)shiftf3: 查找上一个 右键菜单: (4)f4:在…

使用vcXsrv可视化pcl文件

1、下载vcXsrc程序 2、按下面步骤配置 3、按上面操作后,在运行菜单就能看到它在运行了 4、去wsl中配置,即设置环境变量 vim ~/.bashrc # 设置连接windows的VcXsrv export DISPLAY192.168.1.100:0.0 #(192.168.1.100是我windows的ip&#x…

信创学习笔记(四),信创之数据库DB思维导图

创作不易 只因热爱!! 热衷分享,一起成长! “你的鼓励就是我努力付出的动力” 一. 信创学习回顾 1.信创内容 信创内容思维导图 2.信创之CPU芯片架构 信创之CPU芯片架构思维导图 3.信创之操作系统OS 信创之操作系统OS思维导图 二. 信创之国产数据库DB思维导图 …