基于SpringBoot实现MySQL与Redis的数据最终一致性

问题场景

在并发场景下,MySQL和Redis之间的数据不一致性可能成为一个突出问题。这种不一致性可能由网络延迟、并发写入冲突以及异常情况处理等因素引起,导致MySQL和Redis中的数据在某些时间点不同步或出现不一致的情况。数据一致性问题的级别可以分为三种:

  • 强一致性:写入何值,读出何值,但在实现中,性能较差。
  • 弱一致性:写入新数据后,承诺在某个时间级别(分、秒、毫秒)后,达到数据一致。
  • 最终一致性:写入新数据后,承诺在规定时间内达到数据一致。

解决方案

强一致性: 强一致性解决方案在高并发场景下实现过于苛刻,本案例暂不讨论。

弱一致性: 一致性的解决方案可以使用“先写MySQL,再删除Redis”策略,这种方案在极限条件下有不一致的可能性,但结合需求和技术实现可以综合评判。弱一致性的应用场景如:社交平台点赞功能,用户可以实时看到点赞的更新,尽管MySQL和Redis可能存在短暂的数据不一致。

最终一致性: 采用“先写MySQL,通过MySQL的Binlog特性,异步写入Redis”。这种方案一般适用于库存、金融等业务场景,但是需要建立相关失败重试、告警、补偿机制,以及容灾措施。

在本案例中,弱一致性采用 Cache Aside 方案,最终一致性采用阿里巴巴开源组件 canal 实现。

Cache Aside

  1. 该方案在读取数据库时,首先从缓存中查询数据库:
    • 如果缓存中存在数据,则直接返回给应用程序。
    • 如果缓存中不存在数据,则从数据库中读取数据,并将数据存储到缓存中,然后返回给应用程序。
  1. 写入数据时,先更数据库的数据,当数据库更新成功后,再删除缓存中的数据。

Cache Aside注意事项
  • 缓存失效:缓存中的数据可能会过期或失效,需要考虑设置合适的缓存过期时间,或使用合适的缓存失效策略(如LRU)来管理缓存中的数据。
  • 缓存穿透:当请求查询一个不存在的数据时,会导致缓存层无法命中,从而直接访问数据库。为了避免缓存穿透问题,可以使用空值缓存或布隆过滤器等技术来减轻数据库的负载。

综上所述,Cache Aside方案适用于读取频率较高、对数据实时性要求不高的场景,通过合理地使用缓存来提高系统性能和扩展性,并通过维护数据的一致性来避免数据不一致的问题。

Cache Aside demo

基于Cache Aside实现点赞功能。

实体类信息

public class Like {private String postId;private int likeCount;// 构造函数、getter和setter方法
}

逻辑层

@Service
public class LikeService {private final LikeRepository likeRepository;private final RedisUtils redisUtils;public LikeService(LikeRepository likeRepository, RedisUtils redisUtils) {this.likeRepository = likeRepository;this.redisUtils = redisUtils;}public Like getLikeInfo(String postId) {String cacheKey = "like:" + postId;// 从缓存中获取点赞信息Like like = (Like) redisUtils.get(cacheKey);// 如果缓存中不存在,则从持久层(数据库)获取if (like == null) {like = likeRepository.findByPostId(postId);// 如果数据库中存在数据,则保存到缓存中if (like != null) {redisUtils.set(cacheKey, like);}}// 如果点赞信息为空,则初始化为0if (like == null) {like = new Like(postId, 0);}return like;}public void addLike(String postId) {String cacheKey = "like:" + postId;// 在持久层(数据库)新增点赞信息Like like = likeRepository.findByPostId(postId);if (like == null) {like = new Like(postId, 1);} else {like.setLikeCount(like.getLikeCount() + 1);}likeRepository.save(like);// 更新缓存中的数据redisUtils.set(cacheKey, like);}
}

canal

引用canal官方说明:

canal [kə’næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

前置知识:MySQL主从复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
环境搭建

需要的开发环境:

  • MySQL
  • Redis
  • Canal

特别说明:canal只支持JDK 8和JDK 11,如果您在本地物理机安装,请切换JDK默认版本。笔者更建议您使用Docker安装开发环境,由于canal安装后需要修改的配置较多,可以通过Docker-Compose安装。

那么,麻烦ChatGPT写一个Docker-Compose文件吧:

  • version请按本地安装的Docker-Compose版本定义。
  • Docker-Compose安装请自行查询。
version: '2.4'services:mysql:image: mysql:8.0container_name: mysqlrestart: falseenvironment:MYSQL_ROOT_PASSWORD: rootports:- "33060:3306"volumes:- ./mysql-data:/var/lib/mysqlcanal:image: canal/canal-server:v1.1.5container_name: canalrestart: falseports:- "11111:11111"- "11112:11112"depends_on:- mysqlenvironment:- canal.destinations=example- canal.instance.mysql.slaveId=1234- canal.instance.master.address=mysql:3306- canal.instance.dbUsername=root- canal.instance.dbPassword=root- canal.instance.connectionCharset=UTF-8- canal.instance.tsdb.enable=false- canal.instance.gtidon=false- canal.instance.filter.regex=.*- canal.instance.filter.black.regex=mysql\.slave_.*redis:image: redis:latestrestart: alwaysports:- 6379:6379volumes:- ./redis_data:/data

将文件命名为:docker-compose.yml,开始安装。

docker-compose up -d

本案例使用balance余额表来演示,数据库表设计如下:

CREATE TABLE `balance` (`id` varchar(50) NOT NULL COMMENT '主键',`account` varchar(50) NOT NULL COMMENT '账户',`amount` decimal(10,2) NOT NULL COMMENT '金额',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 
COMMENT='余额表';
开发环境
  • JDK 17
  • SpringBoot 3.1.2
  • MyBatis-Plus 3.5.3.1
  • druid
  • lettuce

开发环境根据您的实际需要选择即可。

环境启动后,进入编码阶段。

/*** @author: liu_pc* Date:        2023/8/25* Description: 余额信息变更Redis变成处理类* Version:     1.0*/
@Component
public class BalanceRedisProcessorService implements EntryHandler<Balance>, Runnable {private final Logger logger = LoggerFactory.getLogger(BalanceRedisProcessorService.class);private final RedisUtils redisUtils;private final CanalConfig canalConfig;private final Executor executor;@Value("${canal.server.open}")private boolean open;@Autowiredpublic BalanceRedisProcessorService(RedisUtils redisUtils,CanalConfig canalConfig,@Qualifier("ownThreadPoolExecutor") Executor executor) {this.redisUtils = redisUtils;this.canalConfig = canalConfig;this.executor = executor;}@PostConstructpublic void init() {Map<String, String> mainMdcContext = Maps.newHashMap();mainMdcContext.put("canal-thread", "balance-redis-processor-service");MDC.setContextMap(mainMdcContext);executor.execute(this);logger.info("MySQL-Balance数据自动同步到Redis:线程已经启动");}@Overridepublic void run() {CanalConnector canalConnector = canalConfig.canalConnector();canalConnector.connect();// 回滚到未进行ack的地方canalConnector.rollback();try {while (open) {// 获取数据 每次获取一百条改变数据Message message = canalConnector.getWithoutAck(100);//获取这条消息的idlong batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);continue;}// 处理数据for (CanalEntry.Entry entry : message.getEntries()) {if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.DELETE) {for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {List<CanalEntry.Column> columns = rowData.getAfterColumnsList();String tableName = entry.getHeader().getTableName();// 判断是否是 Balance 表的 amount 字段变更if ("balance".equals(tableName)) {StringBuilder redisKey = new StringBuilder("balance:");for (CanalEntry.Column column : columns) {logger.info("Balance changed in 'balance' dataInfo: {}", column);if ("id".equals(column.getName())) {String changeId = column.getValue();logger.info("当前变更id为:{}", changeId);redisKey.append(changeId);}if ("amount".equals(column.getName())) {String changeValue = column.getValue();logger.info(changeValue);redisUtils.set(redisKey.toString(), changeValue);}}}}}}}// 确认消费完成这条消息canalConnector.ack(message.getId());logger.info("消费成功");}} catch (Exception e) {logger.warn("canal-消费失败");} finally {// 关闭连接canalConnector.disconnect();}}
}
测试

使用接口调用或者手动改库的方式,制造数据变更,查看日志打印情况:

Redis数据:

完成。

我已将canal实现数据同步代码开源,请自行下载领取,笔者不介意您宝贵的Star,如果能帮到您,十分荣幸。

mdc_logback

同时,如果您对笔者其他文章感兴趣,可以扫一扫关注笔者的公众号:种颗代码技术树

公众号文章更新更及时,以及一些程序员周边相关更新。

感谢您阅读到这里,不胜感激。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/1618960.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

数据分析基础(1)——超实用‼️Excel 常用函数和实用技巧

学习教程&#xff1a;☑️ 懒人Excel - Excel 函数公式、操作技巧、数据分析、图表模板、VBA、数据透视表教程 目录 一、Excel知识体系✨ 二、Excel 常用函数&#x1f4a1; 三、Excel 技巧 &#x1f914; 补充&#xff1a; 1、自学数据分析学习路线 2、数据查询网站 一、…

1.Redis 5 环境搭建

一、环境搭建 如果是Centos8&#xff0c;yum 仓库中默认的 Redis版本就是5&#xff0c;直接yum install即可。如果是Centos7&#xff0c;yum 仓库中默认的 Redis版本是3系列&#xff0c;比较老~ 为了我们能在 Centos7中下载到 Redis5 首先要安装额外的软件源 sudo yum insta…

AD常用快捷键记录

一、通用快捷键 1、放大缩小&#xff1a;常用方法&#xff0c;ctrl鼠标滚轮&#xff0c;鼠标中键移动鼠标&#xff0c;pgup、pgup。 2、切换不同的布线层&#xff1a;ctrlshift鼠标滚轮 3、在SCH或者PCB 同一平面内左右翻转&#xff1a;ctrlX 4、在SCH或者PCB 同一平面内上下…

AD9361常用配置概述

ENSM控制 AD9361的状态控制有两种方式&#xff0c;分别为SPI接口控制和引脚控制&#xff0c;也可以通过SPI接口控制使能状态机跳转。在AD9361的正常工作过程中&#xff0c;包括多种不同状态&#xff0c;分别为&#xff1a;SLEEP&#xff08;休眠状态&#xff09;、WAIT&#x…

史上最全的Altium Designer 20安装教程

首先我们下载AD20&#xff0c;下载的链接我放在下面了&#xff01; AD20.2.3下载链接 下载好了之后&#xff0c;得到下面的镜像文件&#xff01; 右击镜像文件&#xff0c;解压&#xff01; 解压后得到如下文件夹&#xff1a; 打开文件夹&#xff0c;右键单击以管理员身份运行…

解决报错: Could not initialize class com.platform.cache.J2CacheUtils

今天运行一个比较久远的SSM项目&#xff0c;访问接口时报错&#xff1a; Could not initialize class com.platform.cache.J2CacheUtils 找了半天也没有发现问题所在&#xff0c;突然发现报错里面有ShiroFilter字样&#xff0c;然后想起以前shiro好像是要跟redis对接&#xff0…

瑞友天翼应用虚拟化系统RCE漏洞复现+利用

1、产品简介 瑞友天翼应用虚拟化系统是西安瑞友信息技术资讯有限公司研发的具有自主知识产权&#xff0c;基于服务器计算架构的应用虚拟化平台。它将用户各种应用软件集中部署在瑞友天翼服务器(群)上&#xff0c;客户端通过WEB即可快速安全的访问经服务器上授权的应用软件&…

Autosar存储入门系列03_NVM状态机及读写存储调用逻辑

本文框架 0.前言1. NVM状态机介绍2. NVM读/写基本逻辑2.1 NVM读操作2.2 NVM写操作2.2.1 实时写2.2.2 下电写 2.3 NVM写入注意事项 0.前言 本系列是Autosar存储入门系列&#xff0c;希望能从学习者的角度把存储相关的知识点梳理一遍&#xff0c;这个过程中如果大家觉得有讲得不…

PC天翼云盘v6.3.4精简版

介绍&#xff1a; 由于天翼云盘网页端不能上传大文件了&#xff0c;需要客户端&#xff0c;于是制作了绿色版&#xff0c;直接打开就能用&#xff0c;装到u盘&#xff0c;走到哪用到哪。 下载不限速&#xff0c;上传文件大小无限制&#xff0c;支持识别MD5秒传。 这是天翼云盘…

如何外网登录访问瑞友天翼应用虚拟化系统?——快解析内网端口映射方案

瑞友天翼应用虚拟化系统&#xff08;GWT System&#xff09;是国内具有自主知识产权的应用虚拟化平台&#xff0c;是基于服务器计算&#xff08;Server-based Computing&#xff09;的应用虚拟化平台。如何将内网平台提供到互联网上外网访问&#xff0c;是我们比较关注的问题。…

Goby 漏洞更新 | 瑞友天翼应用虚拟化系统 index.php 文件远程代码执行漏洞

漏洞名称&#xff1a; 瑞友天翼应用虚拟化系统 index.php 文件远程代码执行漏洞 English Name&#xff1a;Ruiyou Tianyi Application Virtualization System Index.php File Remote Code Execution Vulnerability CVSS core: 9.3 影响资产数&#xff1a;61711 漏洞描述&am…

[maven]关于pom文件中的<relativePath>标签

关于pom文件中的<relativePath>标签 为什么子工程要使用relativePath准确的找到父工程pom.xml.因为本质继承就是pom的继承。父工程pom文件被子工程复用了标签。&#xff08;可以说只要我在父工程定义了标签&#xff0c;子工程就可以没有&#xff0c;因为他继承过来了&…

IO模型和NGINX安装升级

IO模型和NGINX安装升级 IO模型 IO概念 I/O在计算机中指Input/Output&#xff0c; IOPS (Input/Output Per Second)即每秒的输入输出量(或读写次数)&#xff0c;是衡量磁盘性能的主要指标之一。 Linux的IO类型 磁盘I/O 磁盘I/O是进程向内核发起系统调用&#xff0c;请求磁…

EditPlus安装教程

首先官网下载EditPlus&#xff1a;https://www.editplus.com/ 然后直接一直图示安装 &#xff08;也可以放其他盘&#xff09; 最后在下图中输入下面内容&#xff1a; Username: Vovan Regcode: 3AG46-JJ48E-CEACC-8E6EW-ECUAW 大功告成&#xff01;

ICLR 2023 | StrucTexTv2:端到端文档图像理解预训练框架

点击下方卡片&#xff0c;关注“CVer”公众号 AI/CV重磅干货&#xff0c;第一时间送达 点击进入—>【计算机视觉】微信技术交流群 转载自&#xff1a;CSIG文档图像分析与识别专委会 本文简要介绍ICLR 2023录用论文“StrucTexTv2: Masked Visual-Textual Prediction for Docu…

静态方法 与 非静态方法的区别/static 关键字

为什么80%的码农都做不了架构师&#xff1f;>>> 使用static修饰的静态方法是属于整个类的类方法&#xff0c;它在内存中的代码段会随类的定义而被分配和装载&#xff1b;而非静态方法是属于具体对象的方法&#xff0c;当这个对 象创建时&#xff0c;在对象的内存中…

Python 通过traceback追溯异常信息

Python 通过traceback追溯异常信息 导入traceback包 import traceback自定义函数 def func_3():return 1 / 0def func_2():func_3()def func_1():func_2()捕捉异常 try:func_1() except Exception as e:traceback_info traceback.format_exc()print("traceback_info"…

TSRFormer:复杂场景的表格结构识别新利器

编者按&#xff1a;近年来&#xff0c;各大企业和组织机构都在经历数字化转型。将文档转换成计算机所能识别的样态&#xff0c;是数字化转型的关键步骤&#xff0c;如何识别出图片中表格具体的结构与内容&#xff0c;并直接提取其中的数据和信息是学术界和工业界共同瞩目的焦点…

C++设计模式_01_设计模式简介(多态带来的便利;软件设计的目标:复用)

文章目录 本栏简介1. 什么是设计模式2. GOF 设计模式3. 从面向对象谈起4. 深入理解面向对象5. 软件设计固有的复杂性5.1 软件设计复杂性的根本原因5.2 如何解决复杂性 ? 6. 结构化 VS. 面向对象6.1 同一需求的分解写法6.1.1 Shape1.h6.1.2 MainForm1.cpp 6.2 同一需求的抽象的…

聚类分析 | MATLAB实现基于AHC聚类算法可视化

聚类分析 | MATLAB实现基于AHC聚类算法可视化 目录 聚类分析 | MATLAB实现基于AHC聚类算法可视化效果一览基本介绍程序设计参考资料 效果一览 基本介绍 AHC聚类算法&#xff0c;聚类结果可视化&#xff0c;MATLAB程序。 Agglomerative Hierarchical Clustering&#xff08;自底…