实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题

目录

1. 准备工作

2. 将需要缓存的数据存储 Redis

3. 监听 canal 存储在 Kafka Topic 中数据


1. 准备工作

1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启)

修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini

log-bin="HELONG-bin"
binlog_format=ROW     # 只能配置行模式, 因为 Cannal 不具备将SQL转化成数据的能力
binlog-do-db=aicloud    # 监控 AI Cloud 项目

如果要同步多个项目:

binlog-do-db=aicloud
binlog-do-db=aicloud2
binlog-do-db=aicloud3

2. 重启MySQL服务

3. 赋值数据同步权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4. 安装并配置 Canal

下载地址:https://github.com/alibaba/canal/releases

① 修改canal.properties

canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9092

canal 监控 binlog 日志,binlog 日志的传输默认使用 MySQL 的复制协议(基于 TCP/IP),

可以使用写代码的方式直接从 MySQL 服务器读取数据,此处使用本地 kafka 进行存储。

② 修改instance.properties

canal.instance.mysql.slaveId=100   # 大于 1 即可
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=ai-cloud-canal-to-kafka

slaveId 表示从节点 id,canal 的执行原理就是伪装成一个从库去主库同步数据

(主节点的 slaveId = 1)

address 配置连接本地的 MySQL

topic 配置数据发送到 Kafka 的某个主题下

5. 拷贝 Jar 包到 lib

将 canal 下 plugin 下的所有 jar 包拷贝到 lib 目录下。

6. 删除 bin 目录下 startup.bat 里的参数

如果启动时报错:

Unrecognized VM option 'PermSize=128m'

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.

删除 -XX:PermSize=128m 参数即可。

7. 启动 canal

打开 cmd ,cd 到 bin 目录下,输入 startup.bat 回车

2. 将需要缓存的数据存储 Redis

此时我将这个查询列表接口的数据,存储在 Redis 中:

/*** 获取历史聊天记录(对话/绘图)** @param type* @return {@link ResponseEntity }*/
@RequestMapping("/list")
public ResponseEntity getHistoryList(Integer type, Integer model) {String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);Object list = redisTemplate.opsForValue().get(listCacheKey);if (ObjectUtil.isNull(list)) {LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());queryWrapper.eq(Answer::getType, type);queryWrapper.eq(Answer::getModel, model);queryWrapper.orderByDesc(Answer::getAid);List<Answer> answerList = answerService.list(queryWrapper);List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());// 缓存 1 天redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);return ResponseEntity.success(answerVoList);} else {return ResponseEntity.success(list);}
}
/*** 查询列表存储 Redis 缓存** @param uid* @param model* @param type* @return {@link String }*/
public static String getListCacheKey(Long uid, Integer model, Integer type) {return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
}

3. 监听 Kafka Topic 中数据并删除 Redis 缓存

首先对数据库中需要缓存的数据进行一些修改操作:

此时,使用 kafka ui(下载地址划到最底下),刷新 kafka 对应 topic 下的 message,就可以看到当前所作出的修改:

执行修改操作:将 “如何学习Spring???”修改成 “如何学习Spring??”

执行删除操作:

由此可见,对数据库的每一个修改操作,都是对应固定格式的一个数据,所以可以监听对应的  topic 并针对 data 中的数据进行一个提取,得到一个  cacheKey,然后删除对应的缓存,使得下一次的查询去访问数据库,并同步缓存。

【代码示例】

/*** canal 监控 binlog 日志,将修改的数据存储 kafka topic 中* 监听 kafka topic 中的数据** @param data* @param ack* @throws JsonProcessingException*/
@KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);if (map.isEmpty()) {ack.acknowledge();return;}// 匹配上对应的数据库和数据表if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {// 更新缓存 List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);if (!CollectionUtils.isEmpty(list)) {for (Map<String, Object> answerMap : list) {String answerListCacheKey = RedisUtil.getListCacheKey(Long.valueOf(answerMap.get("uid").toString()),Integer.parseInt(answerMap.get("model").toString()),Integer.parseInt(answerMap.get("type").toString()));// 删除缓存,让下一次查询走数据库,并同步缓存redisTemplate.delete(answerListCacheKey);}}}//  手动确认应答ack.acknowledge();
}
/*** canal 同步数据到 kafka*/
public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";/*** 数据库,缓存数据一致性的*/public static final String DATABASE_KEY = "database";public static final String TABLE_KEY = "table";public static final String DATA_KEY = "data";public static final String TARGET_DATABASE = "aicloud";public static final String TARGET_TABLE = "answer";

【补充】

kafka ui 下载地址:​​​​​​https://github.com/provectus/kafka-ui/tags

修改配置

kafka:clusters:- name: kafka3_clusterbootstrapServers: 127.0.0.1:9092

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3266773.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

数据库练习——编写触发器及存储过程

1. 触发器 建立两个表:goods(商品表)、orders(订单表) 在商品表中导入商品记录 mysql> create database mydb16_trigger; Query OK, 1 row affected (0.00 sec)mysql> use mydb16_trigger; Database changed mysql> create table goods(-> gid char(8) primary …

系统架构师(每日一练7)

每日一练 1.关于网络延迟正确的是()。答案与解析 A.在对等网络中&#xff0c;网络的延迟大小与网络中的终端数量无关 B.使用路由器进行数据转发所带来的延迟小于交换机, C.使用internet服务器可最大程度地减小网络延迟 D.服务器延迟的主要影响因素是队列延迟和磁盘10延迟 2.以…

idea中项目目录,文件显示不全问题

问题&#xff1a;idea中项目目录显示不全问题 解决办法1&#xff1a; 删除目录中的.idea文件 用idea重新打开文件就行了 办法2&#xff1a;手动导入为maven项目 1. 2. 3. 4.选择要导入的项目&#xff0c;导入为maven

【网络流】——初识(最大流)

网络流-最大流 基础信息引入一些概念基本性质 最大流定义 Ford–Fulkerson 增广Edmons−Karp算法Dinic 算法参考文献 基础信息 引入 假定现在有一个无限放水的自来水厂和一个无限收水的小区&#xff0c;他们之间有多条水管和一些节点构成。 每一条水管有三个属性&#xff1a…

重拾CSS,前端样式精读-函数(颜色,计算,图像和图形)

前言 本文收录于CSS系列文章中&#xff0c;欢迎阅读指正 在计算机编程中&#xff0c;函数有着重要的作用和意义&#xff0c;它可以实现封装&#xff0c;复用&#xff0c;模块化&#xff0c;参数等功能效果&#xff0c;在如何在CSS中写变量&#xff1f;一文带你了解前端样式利…

AI学习记录 - 图像识别的基础入门

代码实现&#xff0c;图像识别入门其实非常简单&#xff0c;这里使用的是js&#xff0c;其实就是把二维数组进行公式化处理&#xff0c;处理方式如上图&#xff0c;不同的公式代表的不同的意义&#xff0c;这些意义网上其实非常多&#xff0c;这里就不细讲了。 const getSpecif…

【YOLOv8系列】图像分类篇----通过YOLOv8实现图像分类功能

最近需要使用YOLOv8对自己的数据集进行训练,从而实现图像分类的功能,因此记录一下整个过程。 YOLOv8的github地址:https://github.com/ultralytics/ultralytics 参考链接:超详细YOLOv8图像分类全程概述:环境、训练、验证与预测详解 文章目录 一、YOLOv8环境搭建二、准备…

电脑QQ录屏功能怎么用?图文教程,轻松掌握电脑录屏

“想问一下大家知道电脑QQ录屏功能怎么打开吗&#xff1f;一直以来我使用电脑QQ截图非常方便&#xff0c;但不知道原来QQ还有录屏功能。希望知道QQ录屏功能使用方法的朋友教一下我好吗&#xff1f;” 今天&#xff0c;就让我带大家一起探索电脑QQ录屏功能怎么用&#xff1f;看…

怎么注册自己的电子邮件地址

无论是在职场上的工作沟通、日常的在线购物、或是订阅各类新闻资讯&#xff0c;电子邮件都是您不可或缺的数字化工具。本文将手把手引导您完成注册过程&#xff0c;从选择服务商到完成所有必要步骤&#xff0c;帮助您轻松拥有自己的电子邮件账户。 一、选择电子邮件服务商 市…

友盟U-APM——优秀的前端性能监控工具

在数字化转型浪潮的推动下,移动应用已成为企业连接用户、驱动业务增长的核心载体。然而,随着应用复杂度的日益提升,用户对于应用性能稳定性的期待也水涨船高。面对应用崩溃、卡顿、加载缓慢等频发问题,如何确保应用的流畅运行,成为产研团队亟待解决的关键挑战。在此背景下,友盟…

常见的CSS属性(一)——字体、文本、边框、内边距、外边距、背景、行高、圆角、透明度、颜色值

一、字体 二、文本 三、边框 四、外边距 五、内边距 六、背景 七、行高 八、圆角 九、透明度 九、颜色值 元素的继承性是指给父元素设置了某些属性&#xff0c;子元素或后代元素也会有作用。 一、字体 “font-*”是字体相关的属性&#xff0c;具有继承性。代码如下&a…

浅谈监听器之简单数据写入

浅谈监听器之简单数据写入 “简单数据写入”&#xff08;Simple Data Writer&#xff09;监听器便是其中之一&#xff0c;它提供了一种简便的方式来将测试结果直接输出到文件中&#xff0c;便于后续的数据分析与处理。 简单数据写入监听器概述 “简单数据写入”监听器&#…

pdf压缩在线免费 pdf压缩在线免费网页版 在线pdf压缩在线免费 免费pdf压缩工具 压缩到最小几种方法详细步骤分享

PDF是当前最为常见的电子文档格式&#xff0c;它可以保护文档不被篡改或复制格式可以保持原格式。然而&#xff0c;因为市面上积攒的PDF文件数量过多&#xff0c;也容易因为体积太大的缘故&#xff0c;致使后面对磁盘存储造成很大的压力&#xff0c;压缩PDF文件能有效缩小其体积…

海上导航技术介绍

导航的目的主要是帮助人们或设备确定自己在地理空间中的位置&#xff0c;从而能够引导飞机、舰船、车辆等沿着设定路线安全、准确地到达目的地。 导航可以提供两类信息&#xff1a;第一类信息为载体自身的运动参数&#xff0c;如用户自己的三维坐标和速度矢量、航向、姿态等信…

【python】PyQt5中QPushButton的用法详细解析与应用实战

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

苍穹外卖浏览器前端界面修改

背景&#xff1a; 客户原始方案是期望做一个Spring Boot Vue的饿了么系统&#xff0c;但时间上太仓促&#xff0c;所以建议选择开源的苍穹外码目作为作业提交。 客户接受了建议的方案后&#xff0c;期望对前端页面做一些个性化的定制修改。 过程&#xff1a; 苍穹外卖简单介…

Java面试八股之后Spring、spring mvc和spring boot的区别

Spring、spring mvc和spring boot的区别 Spring, Spring Boot和Spring MVC都是Spring框架家族的一部分&#xff0c;它们各自有其特定的用途和优势。下面是它们之间的主要区别&#xff1a; Spring: Spring 是一个开源的轻量级Java开发框架&#xff0c;最初由Rod Johnson创建&…

git实践汇总【配置+日常使用+问题解决】

**最初配置步骤&#xff1a;** git config --global user.name "yournemae" git config --global user.email "yourmail" git config -l ssh-keygen -t rsa -C “xxx.xxxx.EXTcccc.com” git config --global ssh.variant ssh $ git clone git仓库路径 git…

Python + PyQt 搭建可视化页面(PyCharm)

Python PyQt 搭建可视化页面&#xff08;PyCharm&#xff09; 配置PyQt5环境 1.1 安装PyQt5和PyQt5-tools pip install PyQt5pip install PyQt5-tools1.2 QtDesigner和PyUIC环境的配置 配置QTDesigner&#xff0c;用来打开QT可视化开发工具 在PyCharm中依次打开&#xff1a…

docker 构建 mongodb

最近需要在虚拟机上构建搭建mongo的docker容器&#xff0c;搞了半天老有错&#xff0c;归其原因&#xff0c;是因为现在最新的mango镜像的启动方式发生了变化&#xff0c;故此现在好多帖子&#xff0c;就是错的。 ok&#xff0c;话不多说&#xff1a; # 拉取最新镜像&#xf…