Redis学习(十)|使用消息队列的重试机制实现 MySQL 和 Redis 的数据一致性

文章目录

  • 介绍
  • 原理
  • 整体方案
  • 实现步骤
  • 示例代码
  • 总结
  • 其他:Kafka 重试策略配置
    • 1. 生产者重试策略配置
    • 2. 消费者重试策略配置

介绍

在分布式系统中,保持 MySQL 和 Redis 之间的数据一致性是至关重要的。为了确保数据的一致性,我们通常采取先更新数据库,再删除缓存的方案。然而,在实际应用中,由于网络问题、服务故障等原因,可能会导致数据库更新成功而缓存删除失败,进而导致数据不一致。为了解决这个问题,我们可以引入消息队列的重试机制,以确保缓存删除成功。

原理

重试机制是一种容错机制,用于在消息发送失败或者处理失败时进行重试。通过将数据更新操作封装成消息,并发送到消息队列中,在消费者处理消息时进行重试,可以提高系统的可靠性和稳定性。我们将使用消息队列的重试机制来实现 MySQL 和 Redis 的数据一致性。

整体方案

整体方案如下:

  1. 应用程序首先将数据更新操作发送到消息队列中。
  2. 消费者从消息队列中获取消息,并根据消息中的数据删除 Redis 中的缓存数据。
  3. 如果应用删除缓存失败,可以从消息队列中重新读取数据,然后再次删除缓存,这个就是重试机制。当然,如果重试超过的一定次数,还是没有成功,就需要向业务层发送报错信息了。
  4. 如果删除缓存成功,就要把数据从消息队列中移除,避免重复操作,否则就继续重试。

实现步骤

以下是使用消息队列的重试机制实现 MySQL 和 Redis 数据一致性的基本步骤:

  1. 将数据更新操作封装成消息:在应用程序中,将数据更新操作封装成消息,并发送到消息队列中。消息中应包含数据更新操作的类型(如插入、更新或删除)以及相关的数据。
  2. 消费者消息处理和失败重试:消费者从消息队列中获取消息,并根据消息中的数据更新操作来删除 Redis 中的缓存数据。消息消费失败后自动进行重试。可以根据重试次数和重试间隔来配置重试机制,例如指数退避策略。
  3. 消息确认机制:消费者在成功处理消息后,需要发送确认消息给消息队列,告知消息队列可以删除或标记消息为已处理。这样可以确保消息在成功处理后不会被重新处理,避免重复处理的情况。

示例代码

以下是一个简单的示例代码,演示了如何使用 Java 实现消息队列的重试机制,以确保 MySQL 和 Redis 的数据一致性:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class MySQLToKafka {private static final String TOPIC_NAME = "mysql_updates";private static final String BOOTSTRAP_SERVERS = "localhost:9092";private static final String GROUP_ID = "cache-deletion-group";public static void main(String[] args) {// 模拟 MySQL 更新后发送消息到 KafkasendMySQLUpdateToKafka("data_update");// 模拟从 Kafka 拉取消息删除 Redis 缓存pullMessageFromKafkaAndDeleteCache();}// 发送 MySQL 更新消息到 Kafkaprivate static void sendMySQLUpdateToKafka(String message) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);try {producer.send(record).get();System.out.println("Message sent to Kafka successfully: " + message);} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}// 从 Kafka 拉取消息并删除 Redis 缓存private static void pullMessageFromKafkaAndDeleteCache() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC_NAME));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());if (deleteCacheFromRedis(record.value())) {System.out.println("Cache deleted from Redis successfully.");// 处理消息成功后手动提交偏移量consumer.commitAsync();} else {System.out.println("Cache deletion from Redis failed. Kafka will retry.");}}}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}}// 模拟删除 Redis 缓存private static boolean deleteCacheFromRedis(String message) {// 这里省略删除 Redis 缓存的逻辑,直接模拟删除成功和失败// 模拟删除成功的概率为 0.7if (Math.random() < 0.7) {return true;}return false;}
}

以上代码完成了以下几个功能:

  1. 发送 MySQL 更新消息到 Kafka: sendMySQLUpdateToKafka 方法模拟了 MySQL 更新后发送消息到 Kafka 的过程。它使用 Kafka 生产者将消息发送到指定的 Kafka 主题中。
  2. 从 Kafka 拉取消息并删除 Redis 缓存: pullMessageFromKafkaAndDeleteCache 方法模拟了从 Kafka 拉取消息并删除 Redis 缓存的过程。它使用 Kafka 消费者订阅指定的 Kafka 主题,并轮询获取消息。对于每条消息,它尝试删除对应的 Redis 缓存。如果删除失败,就会打印一条消息表示失败,然后 Kafka 将自动重试该消息。只有在成功删除缓存后,才会手动提交偏移量。
  3. 模拟删除 Redis 缓存: deleteCacheFromRedis 方法用于模拟删除 Redis 缓存的过程。它返回一个布尔值,表示缓存删除操作的成功或失败。在实际应用中,这个方法应该被替换为真正的删除 Redis 缓存的逻辑。

通过这样的流程,模拟了一个简单的数据更新、消息发送、消息消费、缓存删除的完整流程,并且在处理消息失败时利用 Kafka 的自动重试机制进行了处理。

总结

通过引入消息队列的重试机制,可以有效地实现 MySQL 和 Redis 的数据一致性。使用重试机制,可以确保数据在 MySQL 更新后 Redis 的对应缓存能够成功删除,从而保持数据的一致性。这种方法适用于需要处理大量数据更新和异步消息传输的场景,同时也提高了系统的可靠性和稳定性。

其他:Kafka 重试策略配置

1. 生产者重试策略配置

对于 Kafka 生产者,可以通过配置以下参数来定义重试策略:

  • retries: 设置生产者在发生可重试的异常时重试的最大次数。默认值为 2147483647(即最大整数)。
  • retry.backoff.ms: 设置生产者在重试之间等待的时间。默认值为 100 毫秒。

示例配置:

# 设置最大重试次数为 3 次
retries=3
# 设置重试之间的等待时间为 500 毫秒
retry.backoff.ms=500

2. 消费者重试策略配置

对于 Kafka 消费者,可以通过以下参数来定义重试策略:

  • enable.auto.commit: 指定消费者是否自动提交偏移量。默认为 true,表示自动提交。
  • auto.commit.interval.ms: 如果启用了自动提交偏移量,可以通过该参数设置自动提交的间隔时间。默认值为 5000 毫秒。
  • max.poll.interval.ms: 设置消费者在拉取消息之间的最大时间间隔。如果消费者在此间隔内没有发送心跳,将被认为失败,并且将其分区重新分配给其他消费者。默认值为 300000 毫秒(5 分钟)。
  • max.poll.records: 设置消费者在单次调用 poll 方法中拉取的最大记录数。默认值为 500 条。

示例配置:

# 禁用自动提交偏移量
enable.auto.commit=false
# 设置自动提交偏移量的间隔时间为 1000 毫秒
auto.commit.interval.ms=1000
# 设置拉取消息之间的最大时间间隔为 10 秒
max.poll.interval.ms=10000
# 设置单次 poll 方法拉取的最大记录数为 100 条
max.poll.records=100

通过以上配置,可以定制 Kafka 生产者和消费者的重试策略,以适应不同的业务需求和性能要求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3017230.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Elasticsearch:使用 MongoDB connector 同步数据到 Elasticsearch

MongoDB 是一个基于分布式文件存储的数据库。由 C 语言编写。旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。MongoDB 是一个介于关系数据库和非关系数据库之间的产品&#xff0c;是非关系数据库当中功能最丰富&#xff0c;最像关系数据库的。Elasticsearch 是一个高效强…

TCP经典异常问题探讨与解决

作者&#xff1a;kernelxing TCP的经典异常问题无非就是丢包和连接中断&#xff0c;在这里我打算与各位聊一聊TCP的RST到底是什么&#xff1f;现网中的RST问题有哪些模样&#xff1f;我们如何去应对、解决&#xff1f;本文将从RST原理、排查手段、现网痛难点案例三个板块自上而…

【容器】Pod 生命周期

概述 Pod的生命周期包含从Pod创建事件的触发到Pod被停止的整个流程。了解Pod的生命周期方便日常排障&#xff0c;并能帮助较深入了解K8s。 在Pod生命周期中有两个重要的标识&#xff1a;Pod Condition 和 Pod Phase。Pod Phase提供了一个Pod当前状况的概览&#xff0c;可以帮…

Unity EventSystem入门

概述 相信在学习Unity中&#xff0c;一定有被UI事件困扰的时候把&#xff0c;当添加UICanvas的时候&#xff0c;Unity会为我们自动添加EventSystem&#xff0c;这个是为什么呢&#xff0c;Unity的UI事件是如何处理的呢&#xff0c;在使用各个UI组件的时候&#xff0c;一定有不…

Linux搭建http发布yum源

1、搭建http源yum仓库 &#xff08;1&#xff09;在yum仓库服务端安装httpd yum -y install httpd &#xff08;2&#xff09;修改配置文件 我们httpd 中默认提供web 界面的位置是我们/var/www/html 目录&#xff0c;如果我们yum 源想指定目录&#xff0c;就需要修改蓝框2处…

如何用TONGYILingma进行AI辅助编程?

通义灵码&#xff0c;是阿里云出品的一款基于通义大模型的智能编码辅助工具&#xff0c;提供行级/函数级实时续写、自然语言生成代码、单元测试生成、代码优化、注释生成、代码解释、研发智能问答、异常报错排查等能力&#xff0c;并针对阿里云的云服务使用场景调优&#xff0c…

使用quicker进行局域网文件互传

使用了动作&#xff1a;文件服务器 https://getquicker.net/Sharedaction?code7a49ca6b-d243-4478-1e87-08d9f1ba2358 在文件夹中打开打开这个动作就能使用。 配置 右键动作可以设置&#xff1a; 选择了最后一个之后&#xff0c;打开服务之后能在右下角有一个弹窗&#xff…

八、Redis集群模式(3主3从)

目录 一、环境准备 二、集群搭建 一、环境准备 IP 角色 192.168.134.132 192.168.134.132&#xff1a;7001 192.168.134.132&#xff1a;7002 192.168.134.133 192.168.134.133&#xff1a;7003 192.168.134.133&#xff1a;7004 192.168.134.134 192.168.134.134&…

基于Springboot的教学辅助系统(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的教学辅助系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&…

男士内裤多久换一次最合适?男士最靠谱的内裤品牌推荐

很多男同胞可能认为内裤没什么好讲究的&#xff0c;随便穿一条就好&#xff0c;而且也没有定期更换。实际上长期穿质量不好或没有定期更换内裤&#xff0c;会对健康造成一定影响。而现在市面上的男士内裤品牌和种类又有不少&#xff0c;让大家选择时感觉眼花缭乱&#xff0c;所…

Linux —— 信号(3)

Linux —— 信号&#xff08;3&#xff09; Core dump为什么core默认是被关闭的阻塞信号信号其他相关常见概念信号递达信号未决信号阻塞两者的区别信号的结构 信号集操作函数一个简单使用例子sigpending的使用例子 我们今天接着来了解信号&#xff1a; Core dump 大家不知道有…

Mybatis的简介和下载安装

什么是 MyBatis &#xff1f; MyBatis 是一款优秀的持久层框架&#xff0c;它支持定制化 SQL、存储过程以及高级映射。MyBatis 避免了几乎所有的 JDBC 代码和手动设置参数以及获取结果集。MyBatis 可以使用简单的 XML 或注解来配置和映射原生信息&#xff0c;将接口和 Java 的…

北京大学-知存科技存算一体联合实验室揭牌,开启知存科技产学研融合战略新升级

5月5日&#xff0c;“北京大学-知存科技存算一体技术联合实验室”在北京大学微纳电子大厦正式揭牌&#xff0c;北京大学集成电路学院院长蔡一茂、北京大学集成电路学院副院长鲁文高及学院相关负责人、知存科技创始人兼CEO王绍迪、知存科技首席科学家郭昕婕博士及企业研发相关负…

Mybatis进阶4-权限管理

权限管理 1.权限 //相当于 职责 2.用户 //相当于 职员&#xff08;职员就职于一个职位&#xff09; 3.角色 //相当于 职位&#xff08;有多个职责&#xff09; 权限管理基础表&#xff1a;权限表&#xff0c;用户表&#xff0c;角色表 问题1&#xff1a;…

2005-2021年全国各地级市生态环境注意力/环保注意力数据(根据政府报告文本词频统计)

2005-2021年全国各地级市生态环境注意力/环保注意力数据&#xff08;根据政府报告文本词频统计&#xff09; 2005-2021年全国各地级市生态环境注意力/环保注意力数据&#xff08;根据政府报告文本词频统计&#xff09; 1、时间&#xff1a;2005-2021年 2、范围&#xff1a;2…

BIGRU、CNN-BIGRU、CNN-BIGRU-ATTENTION、TCN-BIGRU、TCN-BIGRU-ATTENTION合集

&#xff08;BIGRU、CNN-BIGRU、CNN-BIGRU-ATTENTION、TCN-BIGRU、TCN-BIGRU-ATTENTION&#xff09;时&#xff0c;我们可以从它们的基本结构、工作原理、应用场景以及优缺点等方面进行详细介绍和分析。 BIGRU、CNN-BIGRU、CNN-BIGRU-ATTENTION、TCN-BIGRU等&#xff08;matlab…

DDR4 新功能介绍

DDR4(第四代双倍数据率同步动态随机存取内存)相较于其前代DDR3,引入了一些新的功能和改进,这些新功能有助于提高内存的性能、降低功耗以及增强系统的可靠性,包括VPP、DBI(Data Bus Inversion,数据总线翻转)和DMI(与LPDDR4相关)。以下是对这些功能的简要说明: 更高的…

Python | Leetcode Python题解之第67题二进制求和

题目&#xff1a; 题解&#xff1a; class Solution:def addBinary(self, a, b) -> str:return {0:b}.format(int(a, 2) int(b, 2))

synchronized与volatile关键字

1.synchronized的特性 1.1互斥 synchronized 会起到互斥效果, 某个线程执行到某个对象的 synchronized 中时, 其他线程如果也执行到 同一个对象 synchronized 就会阻塞等待. 进入 synchronized 修饰的代码块, 相当于 加锁 退出 synchronized 修饰的代码块, 相当于 解锁 syn…

关于在Conda创建的虚拟环境中安装好OpenCV包后,在Pycharm中依然无法使用且import cv2时报错的问题

如果你也掉进这个坑里了&#xff0c;请记住opencv-python&#xff01;opencv-python&#xff01;&#xff01;opencv-python&#xff01;&#xff01;&#xff01; 不要贪图省事直接在Anaconda界面中自动勾选安装libopencv/opencv/py-opencv包&#xff0c;或者在Pycharm中的解…