Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性

目录

1. 背景

2. Windows系统安装canal

3.Mysql准备工作

4. 公共依赖包

5. Redis缓存设计

6. mall-canal-service


1. 背景

canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。所以其核心功能如下:

  • 数据实时备份
  • 异构数据源(elasticsearch、Hbase)与数据库数据增量同步
  • 业务缓存cache 刷新,保证缓存一致性
  • 带业务逻辑的增量数据处理,如监听某个数据的变化做一定的逻辑处理

canal是借助于MySQL主从复制原理实现

  • master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  • slave将master的binary log events拷贝到它的中继日志(relay log);
  • slave重做中继日志中的事件,将改变反映它自己的数据。

 canal的工作原理:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

2. Windows系统安装canal

Github下载链接:Releases · alibaba/canal · GitHub

 

 解压

 进入/conf/example,修改instance.properties

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

之后进入到bin目录下启动startup就可以了。

3.Mysql准备工作

/*
SQLyog Community v13.2.0 (64 bit)
MySQL - 8.0.33 : Database - shop
*********************************************************************
*//*!40101 SET NAMES utf8 */;/*!40101 SET SQL_MODE=''*/;/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;USE `shop`;/*Table structure for table `brand` */DROP TABLE IF EXISTS `brand`;CREATE TABLE `brand` (`id` INT NOT NULL AUTO_INCREMENT COMMENT '品牌id',`name` VARCHAR(100) NOT NULL COMMENT '品牌名称',`image` VARCHAR(1000) DEFAULT '' COMMENT '品牌图片地址',`initial` VARCHAR(1) DEFAULT '' COMMENT '品牌的首字母',`sort` INT DEFAULT NULL COMMENT '排序',PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 COMMENT='品牌表';/*Data for the table `brand` */INSERT  INTO `brand`(`id`,`name`,`image`,`initial`,`sort`) VALUES 
(11,'华为','https://sklll.oss-cn-beijing.aliyuncs.com/secby/eed72cc4-a9c1-4010-949a-03cef5b933d6.jpg','',NULL),
(12,'中兴','https://sklll.oss-cn-beijing.aliyuncs.com/secby/4fedb361-5ab3-4ad0-a667-580c1f37dff0.jpg','',NULL),
(13,'大疆','https://sklll.oss-cn-beijing.aliyuncs.com/secby/e8382c48-0487-4a9b-8fd0-a3716c3eea19.jpg','',NULL);/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

4. 公共依赖包

<dependencies><!--web包--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--MyBatis Plus--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.2</version></dependency><!--MySQL--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!--Redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--Nacos--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency></dependencies>

5. Redis缓存设计

这里基于Springcloud Alibaba进行设计,对应的服务名称为mall-goods

bootstrap.yaml代码如下:

server:port: 8081
spring:application:name: mall-goodsdatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTCusername: rootpassword: 123456cloud:nacos:config:file-extension: yamlserver-addr: localhost:8848discovery:#Nacos的注册地址server-addr: localhost:8848redis:host: xx //改为自己redis ip地址port: 6379

导入RedisConfig配置

@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(factory);GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer();// 值采用json序列化redisTemplate.setValueSerializer(serializer);//使用StringRedisSerializer来序列化和反序列化redis的key值redisTemplate.setKeySerializer(new StringRedisSerializer());// 设置hash key 和value序列化模式redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(serializer);redisTemplate.afterPropertiesSet();return redisTemplate;}@Beanpublic RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(12))//设置默认缓存时间.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);}
}

对于Brand进行redis缓存设计

在主启动类上添加@EnableCaching开启缓存。

@Cacheable 注解可以标记一个方法需要被缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会先查找缓存,如果缓存中存在相应的数据,则直接从缓存中读取,否则执行方法并将结果缓存到缓存中。

@CachePut 注解可以标记一个方法需要更新缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会更新缓存中的数据。

@CacheEvict 注解可以标记一个方法需要删除缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会删除缓存中对应的数据。

代码如下:

@RestController
@RequestMapping("/brand")
public class BrandController {@AutowiredBrandService brandService;@GetMapping("/{id}")@Cacheable(value = "brand",key = "#id")public Brand search(@PathVariable(value = "id")Integer id){return brandService.getById(id);}/***** 添加缓存*/@PostMapping@CachePut(value = "brand",key = "#brand.id")public Brand add(@RequestBody Brand brand){return brand;}/***** 修改缓存*/@PutMapping@CachePut(value = "brand",key = "#brand.id")public Brand update(@RequestBody Brand brand){return brand;}/***** 删除缓存*/@DeleteMapping("/{id}")@CacheEvict(value = "brand",key = "#id")public void delete(@PathVariable(value = "id")Integer id){}
}

6. mall-canal-service

这里微服务名称为mall-canal-service,监控Mysql数据库数据变化进行实时的更新缓存。

除公共依赖外导入依赖

     <dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-loadbalancer</artifactId></dependency>

bootstrap.yaml设计如下:

server:port: 8083
spring:application:name: mall-canalcloud:nacos:config:file-extension: yamlserver-addr: localhost:8848discovery:#Nacos的注册地址server-addr: localhost:8848
#Canal配置
canal:server: localhost:11111destination: example

设计Feign接口

@FeignClient(value = "mall-goods",contextId = "brand")//服务名字
public interface BrandFeign {@GetMapping("/brand/{id}")Brand search(@PathVariable(value = "id")Integer id);@PostMapping("/brand")Brand add(@RequestBody Brand brand);/***** 修改方法*/@PutMapping("/brand")Brand update(@RequestBody Brand brand);/***** 删除方法*/@DeleteMapping("/brand/{id}")void delete(@PathVariable(value = "id")Integer id);}

Canal实时监控Mysql数据库变化,代码设计如下:

@Component
@CanalTable(value = "brand")
public class BrandHandler implements EntryHandler<Brand> {@AutowiredBrandFeign brandFeign;@Overridepublic void insert(Brand brand) {System.out.println(brand);brandFeign.add(brand);}@Overridepublic void update(Brand before, Brand after) {System.out.println(after);brandFeign.update(after);}@Overridepublic void delete(Brand brand) {System.out.println(brand);brandFeign.delete(brand.getId());}
}

在MallCanalApplication主启动类上添加@EnableFeignClients(basePackages = {"org.example.feign"}),包名为feign接口路径。

注意:本人在使用canal时,数据库有非varchar类型的null值,在运行时会抛异常错误。

	at top.javatool.canal.client.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:52) ~[canal-client-1.2.1-RELEASE.jar:na]at top.javatool.canal.client.handler.impl.AsyncMessageHandlerImpl.lambda$handleMessage$0(AsyncMessageHandlerImpl.java:30) ~[canal-client-1.2.1-RELEASE.jar:na]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_281]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_281]at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_281]
Caused by: java.lang.NumberFormatException: For input string: ""at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_281]

代码运行后,经过测试成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2659769.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

nginx源码分析-1

使用gdb查看函数上下文&#xff1a; gdb attach nginx的work线程 监听端口状态时&#xff1a; 断点打在ngx_http_process_request 并通过浏览器触发请求时&#xff1a;

把这些软件测试经典面试题!全背下来,拿offer就像喝水一样!

1、什么是兼容性测试&#xff1f;兼容性测试侧重哪些方面&#xff1f; 兼容测试主要是检查软件在不同的硬件平台、软件平台上是否可以正常的运行&#xff0c;即是通常说的软件的可移植性。兼容的类型&#xff0c;如果细分的话&#xff0c;有平台的兼容&#xff0c;网络兼容&am…

哈希桶的模拟实现【C++】

文章目录 哈希冲突解决闭散列 &#xff08;开放定址法&#xff09;开散列 &#xff08;链地址法、哈希桶&#xff09;开散列实现&#xff08;哈希桶&#xff09;哈希表的结构InsertFindErase 哈希冲突解决 闭散列 &#xff08;开放定址法&#xff09; 发生哈希冲突时&#xf…

【算法刷题】Day26

文章目录 1. 买卖股票的最佳时机含冷冻期题干&#xff1a;算法原理&#xff1a;1. 状态表示&#xff1a;2. 状态转移方程3. 初始化4. 填表顺序5. 返回值 代码&#xff1a; 2. 替换所有的问号题干&#xff1a;算法原理&#xff1a;代码&#xff1a; 1. 买卖股票的最佳时机含冷冻…

组件间的值传递:改进若依框架中的BarChart.vue组件

改进前的BarChart 如下是若依(Ruoyi)框架中的BarChart.vue文件&#xff0c;该BarChart.vue无法实现组件间的值传递。到这里您不妨可以试试该如何去传值。如果您不想思考&#xff0c;请看改进后的BarChart。直接拿走使用&#xff01; <template><div :class"cla…

大量数据的渲染优化-分页渲染方案

文章目录 直接渲染数据的拆分使用定时器分页渲染 相信有一道耳熟能详的题目&#xff0c;如果前端获取到了 10w 条数据&#xff0c;应该怎么渲染&#xff1f;本文就以此为例&#xff0c;来进行切入&#xff0c;解析大量数据渲染的方案 直接渲染 样式代码比较简单&#xff0c;我就…

Android原生实现单选

六年前写的一个控件&#xff0c;一直没有时间总结&#xff0c;趁年底不怎么忙&#xff0c;整理一下之前写过的组件。供大家一起参考学习。废话不多说&#xff0c;先上图。 一、效果图 实现思路使用的是radioGroup加radiobutton组合方式。原理就是通过修改RadioButton 的backgr…

vue3-富文本编辑器(vue-quill)

官网&#xff1a;VueQuill | Rich Text Editor Component for Vue 3 安装 pnpm add vueup/vue-quilllatest 使用 局部使用 先导包 import { QuillEditor } from vueup/vue-quill import vueup/vue-quill/dist/vue-quill.snow.css; 再使用 <QuillEditor theme"snow…

想要快速搭建知识付费平台?找明理信息科技!

明理信息科技知识付费saas租户平台 一、确定目标群体 首先&#xff0c;你需要明确你的知识付费平台的目标用户是谁。这将帮助你确定所需的内容和功能&#xff0c;以及如何吸引和留住这些用户。例如&#xff0c;如果你的目标群体是职场新人&#xff0c;你的平台可能需要提供…

MFC工程中无法使用cygwin64的库

文章目录 MFC工程中无法使用cygwin64的库概述在MFC中使用cygwin64的静态库在MFC中使用cygwin64的DLL进行静态包含在MFC中使用cygwin64的DLL进行动态调用唯一可以使用cygwin64的方法是进程隔离来通讯cygwin64的官方用途修正后的启动进程隐藏dos窗口的函数动态载入DLL的实现 - La…

代码随想录27期|Python|Day29|回溯算法|491.递增子序列|46.全排列|47.全排列 II

491. 非递减子序列 本题不是单纯的去重题目&#xff0c;而是需要保持数字在原数组的顺序。 比如&#xff1a;[4,5,6,7]和[4,6,5,7]相比&#xff0c;后者就不能选择[5,6,7]这个排列&#xff0c;因为违反了设置的顺序。所以去重的方法就只有哈希表。 需要在每一层设置一个哈希表…

leaflet学习笔记-初始化vue项目(一)

leaflet简介 Leaflet是一款开源的轻量级交互式地图可视化JavaScript库&#xff0c;能够满足大多数开发者的地图可视化需求&#xff0c;其最早的版本大小仅仅38 KB。Leaflet能够在主流的计算机或移动设备上高效运行&#xff0c;其功能可通过插件进行扩展&#xff0c;拥有易于使用…

【Linux】指令(本人使用比较少的)——笔记(持续更新)

文章目录 ps -axj&#xff1a;查看进程ps -aL&#xff1a;查看线程echo $?&#xff1a;查看最近程序的退出码jobs&#xff1a;查看后台运行的线程组fd 任务号&#xff1a;将后台任务提到前台bg 任务号&#xff1a;将暂停的后台程序重启netstat -nltp&#xff1a;查看服务及监听…

C#中的Attribute详解(下)

C#中的Attribute详解&#xff08;下&#xff09; 一、Attribute本质二、Attribute实例化三、Attribute实例化的独特之处四、元数据的作用五、自定义Attribute实例六、Attribute的附着目标七、附加问题 一、Attribute本质 从上篇里我们可以看到&#xff0c;Attribute似乎总跟pu…

SAP VA01 创建带wbs号的销售订单包 CJ067的错误

接口错误提示如下 SAP官方 CJ067 124177 - VA01: CJ067 during WBS acct assgmt with a different business area S4的core 刚好能用上 实施 这个note后成功

nestjs入门教程系列(一):让项目先跑起来

nestjs启动基本步骤 Nest (NestJS) 是一个用于构建高效、可扩展的 Node.js 服务器端应用的框架。 它使用渐进式 JavaScript&#xff0c;构建并完全支持 TypeScript&#xff08;但仍然允许开发者使用纯 JavaScript 进行编码&#xff09;并结合了 OOP&#xff08;面向对象编程&am…

第二节-数据封装+传输介质

数据传输的形式&#xff1a; 1.电路交换 2.报文交换&#xff1a; 在数据之外&#xff0c;加上能够标识接收者以及发送者的信息 3.分组交换&#xff1a; 依然进行报文交换&#xff0c;不过讲每个数据的大小进行定义 应用层&#xff08;数据data&#xff09;->传输层&am…

K8S部署Harbor仓库实战

K8S部署Harbor仓库实战 K8S部署Harbor仓库实战 - 简书 创建文件目录 chartmuseum目录: /var/nfs/data/harbor/chartmuseumdatabase目录: /var/nfs/data/harbor/databasejobservice目录: /var/nfs/data/harbor/jobserviceredis目录: /var/nfs/data/harbor/redisregistry目录:…

深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第五节 引用类型复制问题及用克隆接口ICloneable修复

深入浅出图解C#堆与栈 C# Heaping VS Stacking 第五节 引用类型复制问题及用克隆接口ICloneable修复 [深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第一节 理解堆与栈](https://mp.csdn.net/mdeditor/101021023)[深入浅出图解C#堆与栈 C# Heap(ing) VS Stack(ing) 第二节…

【头歌实训】kafka-入门篇

文章目录 第1关&#xff1a;kafka - 初体验任务描述相关知识Kafka 简述Kafka 应用场景Kafka 架构组件kafka 常用命令 编程要求测试说明答案代码 第2关&#xff1a;生产者 &#xff08;Producer &#xff09; - 简单模式任务描述相关知识Producer 简单模式Producer 的开发步骤Ka…