canal 数据同步组件

canal 数据异构组件

为啥要使用这个组件?
在更新DB的时候不同步更新到redis,es等数据库中,时间太久,而且可能会存在同步失败的问题,因此引入canal去拉取DB的数据,再去更新到redis,es等数据库中,有失败重试和回滚等功能。
canal原理?
canal 伪装成salve向mysql发送dump协议,拿到备份数据binlog,去更新数据到redis,es等数据库中或者通过组装数据之后更新。canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据

canal 组件的使用

1.下载canal组件

下载地址canal组件下载地址
在我的资源中也有canal组件包
在这里插入图片描述
解压启动(我是windows版,双击startup.bat)

在这里插入图片描述

2.数据库配置

1.开启MySQL , 需要先开启 Binlog 写入功能

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2.授权 canal 作为mysql 的slave 的权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.项目引入jar包
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>
4.写canal监听数据工具类
package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}
5.简单例子使用测试

1.数据库更改user_id从0改为1,再从1改为0
2.查看canal监测的数据(canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6.进一步完善canal监听数据工具类,用于应用例子

1.加入监听器,项目启动时启动
2.使用线程去监听数据
3.替换掉system.out.print(),里面有锁,会阻塞,使用日志打印
4.处理canal监测到的数据

package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.next.dao.TrainNumberDetailMapper;
import com.next.service.TrainNumberService;
import com.next.service.TrainSeatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;/*** @desc 不要用system.out.print()里面有锁,会阻塞,用日志打印*/
@Service
@Slf4j
public class CanalSubscribe implements ApplicationListener<ContextRefreshedEvent> {@Resourceprivate TrainSeatService trainSeatService;@Resourceprivate TrainNumberService trainNumberService;//监听,启动的时候就开始调用此监听方法@Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {canalSubscribe();}private void canalSubscribe() {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;//使用线程new Thread(() -> {try {log.info("canal subscribe");connector.connect();connector.subscribe(".*\\..*");connector.rollback();while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//没有取到数据继续safeSleep(100);continue;}try {log.info("new message,batchIds:{},size:{}", batchId, batchSize);//打印日志printEntry(message.getEntries());// 提交确认connector.ack(batchId);} catch (Exception e2) {log.error("canal data exception,batchIds:{}", batchId, e2);// 处理失败, 回滚数据connector.rollback(batchId);}}} catch (Exception e3) {log.error("canal subscribe exception", e3);safeSleep(1000);canalSubscribe();}}).start();}private void printEntry(List<CanalEntry.Entry> entrys) throws Exception{for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("RowChange.parse Exception , data:" + entry, e);}//更新类型-更新,删除,新增CanalEntry.EventType eventType = rowChage.getEventType();//数据库名String schemaName = entry.getHeader().getSchemaName();//表名String tableName = entry.getHeader().getTableName();log.info("name:[{},{}],eventType:{}",schemaName,tableName,eventType);for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {handleColumn(rowData.getBeforeColumnsList(), eventType, schemaName, tableName);} else {handleColumn(rowData.getAfterColumnsList(), eventType, schemaName, tableName);}}}}//处理canal监测到的数据private void handleColumn(List<CanalEntry.Column> columnsList, CanalEntry.EventType eventType, String schemaName, String tableName) throws Exception{if(schemaName.contains("12306_seat_")){//处理座位变更trainSeatService.handle(columnsList,eventType);}else if(tableName.equals("train_number")){//车次详情处理(实际上是车次信息变更之后才批量处理车次详情)trainNumberService.handle(columnsList,eventType);}else{log.info("drop data,no need care");}}private void safeSleep(int millis) {try {Thread.sleep(100);} catch (Exception e1) {}}}

处理canal监测到的数据(拿到改变的数据,放到实体类中,存到redis中)

package com.next.service;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainSeat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.List;@Service
@Slf4j
public class TrainSeatService {@Resourceprivate TrainNumberMapper trainNumberMapper;@Resourceprivate TrainCacheService trainCacheService;//处理座位,canal通过监听座位库,拿到改变的数据,放到实体类中public void handle(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) {if (eventType != CanalEntry.EventType.UPDATE) {log.info("not update,no need care");return;}TrainSeat trainSeat = new TrainSeat();boolean isStatusUpdated = false;for (CanalEntry.Column column : columns) {//票的状态改变了才做下面的操作if (column.getName().equals("status")) {trainSeat.setStatus(Integer.parseInt(column.getValue()));if (column.getUpdated()) {isStatusUpdated = true;} else {break;}} else if (column.getName().equals("id")) {trainSeat.setId(Long.parseLong(column.getValue()));} else if (column.getName().equals("carriage_number")) {trainSeat.setCarriageNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("row_number")) {trainSeat.setRowNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("seat_number")) {trainSeat.setSeatNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("train_number_id")) {trainSeat.setTrainNumberId(Integer.parseInt(column.getValue()));} else if (column.getName().equals("ticket")) {trainSeat.setTicket(column.getValue());} else if (column.getName().equals("from_station_id")) {trainSeat.setFromStationId(Integer.parseInt(column.getValue()));} else if (column.getName().equals("to_station_id")) {trainSeat.setToStationId(Integer.parseInt(column.getValue()));}}if (!isStatusUpdated) {log.info("status not update,no need care");}log.info("train seat update,trainSeat:{}", trainSeat);/*** 数据存到redis* 1.指定座位被占:hash* cacheKey:车次_日期  D386_20231001* field: carriage_row_seat_fromStationId_toStationId* value: 0-空闲 1-占座** 2.每个座位详情剩余的座位数* cacheKey: 车次_日期_count D386_20231001_count* field: fromStationId_toStationId* value: 实际座位数**/TrainNumber trainNumber = trainNumberMapper.selectByPrimaryKey(trainSeat.getTrainNumberId());//放票if (trainSeat.getStatus() == 1) {trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),"0");trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),1l);log.info("seat+1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);//占票} else if (trainSeat.getStatus() == 2) {trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),"1");trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),-1l);log.info("seat-1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);} else {log.info("status update not 1 or 2,no need care");}}}

在这里插入图片描述

参考文档:canal使用说明文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2660040.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

【Gradio】1、Gradio 是什么

官网&#xff1a;https://www.gradio.app/ 一、Gradio 是什么 Gradio是一个用于创建机器学习模型交互式界面的 Python 库。通过Gradio&#xff0c;可以快速地为模型构建一个可视化的、易于使用的Web界面&#xff0c;无需编写任何Web前端代码。 Gradio 支持多种不同类型的输入…

代码随想录算法训练营第三十天|332.重新安排行程、51. N皇后 、37. 解数独

332.重新安排行程 题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 文档讲解&#xff1a;代码随想录 C代码&#xff1a; class Solution { public: unordered_map<string, map<string, int>> targets;bool backtrack…

Python实现张万森下雪了的效果

写在前面 即将步入婚宴殿堂的女主林北星&#xff0c;遭遇了男友展宇的毁约&#xff0c;生活和工作也变得一团糟。与此同时&#xff0c;她被时光老人带回了十八岁的高三时光&#xff0c;重新开启了自己的人生。林北星摆脱了展宇的束缚&#xff0c;认真准备高考&#xff0c;想要…

九九乘法表c 语言 用于打印九九乘法表

以下是一个简单的C语言程序&#xff0c;用于打印九九乘法表&#xff1a; #include <stdio.h>int main() {int i, j;for (i 1; i < 9; i) {for (j 1; j < i; j) {printf("%d*%d%-2d ", j, i, i*j);}printf("\n");}return 0; }解释&#xff1…

html-css-js移动端导航栏底部固定+i18n国际化全局

需求&#xff1a;要做一个移动端的仿照小程序的导航栏页面操作&#xff0c;但是这边加上了i18n国家化&#xff0c;由于页面切换的时候会导致国际化失效&#xff0c;所以写了这篇文章 1.效果 切换页面的时候中英文也会跟着改变&#xff0c;不会导致切换后回到默认的语言 2.实现…

接口自动化框架设计之参数传递

在我们设计自动化测试框架的时候&#xff0c;我们会经常将测试数据保存在外部的文件&#xff08;如Excel、YAML&#xff09;中&#xff0c;实现测试脚本与测试数据解耦&#xff0c;方便后期维护。 当涉及到业务场景接口用例时&#xff0c;由于接口与接口存在关联关系&#xff…

Mysql For Navicate (老韩)

Navicate创建数据库 先创建一个数据库;然后在数据库中创建一张表;在表格当中填入相应的属性字段;打开表, 然后填入相应的实例字段; – 使用数据库图形化App和使用指令来进行操作各有各的好处和利弊; 数据库的三层结构(破除MySQL神秘) 所谓安装Mysql数据库, 就是在主机安装一…

线程基础知识(三)

前言 之前两篇文章介绍了线程的基本概念和锁的基本知识&#xff0c;本文主要是学习同步机制&#xff0c;包括使用synchronized关键字、ReentrantLock等&#xff0c;了解锁的种类&#xff0c;死锁、竞争条件等并发编程中常见的问题。 关键字synchronized synchronied关键字可…

java设计模式学习之【备忘录模式】

文章目录 引言备忘录模式简介定义与用途实现方式 使用场景优势与劣势在Spring框架中的应用备忘录示例代码地址 引言 想象一下&#xff0c;你正在编辑一篇重要的文档&#xff0c;突然你意识到最近的一些更改实际上破坏了文档的结构。幸运的是&#xff0c;你的文本编辑器允许你撤…

【react项目开发遇到的问题记录】:The above error occurred in the <Route.Provider> component:

在进行路由切换会出现一下报错&#xff1a;The above error occurred in the <Route.Provider> component: 解决办法 在使用组件懒加载的时候&#xff0c;在外面套一个react的组件:Suspense Suspense 目前在 react 中一般配合 lazy 使用&#xff0c;当有一些组件需要动…

NLP论文阅读记录 - 02 | 2022 自动文本摘要方法:综合回顾

文章目录 前言0、论文摘要一、Introduction1.1文本摘要的要求1.2主要研究贡献 二.ATS的分类2.1基于没有。输入文档的数量2.2 基于总结方法2.3 基于输出摘要性质&#xff1a;2.4 基于摘要语言2.4.1 基于摘要算法2.5 基于摘要内容2.6 基于摘要类型2.7 基于概括域2.8 基于加工水平…

PyTorch 进阶指南,10个必须知道的原则

PyTorch 是一种流行的深度学习框架&#xff0c;它提供了强大的工具和灵活的接口&#xff0c;使得开发者能够搭建和训练各种神经网络模型。这份指南旨在为开发者提供一些有用的原则&#xff0c;以帮助他们在PyTorch中编写高效、可维护和可扩展的代码。 如果你对 Pytorch 还处于…

软件测试面试中90%会遇到的问题,面试前刷提高百分之60的通过率

面试的时候&#xff0c;遇到这样的提问&#xff0c;很多人的都会感觉脑子一下一片空白&#xff0c;或者星星点点&#xff0c;不知道从何说起。 一方面不知道面试官问这个问题的意图是什么&#xff1f;也不知道他想得到的答案是什么&#xff1f; 更加不知道该从哪些方面来回答…

在香橙派5 Plus上搭建Gitlab

作为一个码农&#xff0c;一定知道Github这个最大的成人交友网站。但是Github在国内不稳定&#xff0c;经常拉不下来代码&#xff0c;也就无法推送代码。为了更方便的使用&#xff0c;顺便更好地了解Git工具&#xff0c;决定在香橙派5 Plus上搭建一个属于自己的代码仓库。 1、…

Grafana 配置告警

配置告警 配置告警 1. Grafana 配置文件配置 #################################### SMTP / Emailing ########################## [smtp] enabled true host smtp.qq.com:587 user 9**qq.com # If the password contains # or ; you have to wrap it with triple quotes…

lv13 内核模块动态添加新功能 6

1 动态加载法 即新功能源码与内核其它源码不一起编译&#xff0c;而是独立编译成内核的插件(被称为内核模块&#xff09;文件.ko 1.1 新功能源码与Linux内核源码在同一目录结构下时 给新功能代码配置Kconfig&#xff08;模块代码与上一级相同&#xff09; 给新功能代码改写…

软件性能测试如何分类?广东省CMA、CNAS软件检测机构有哪些?

性能测试是衡量软件产品质量的有效手段&#xff0c;是保证软件产品可靠性和稳定性的重要环节之一&#xff0c;主要是通过对软件运行的各种参数和行为进行检测以评估软件的性能&#xff0c;在软件质量保证中有着十分重要的作用。 一、软件性能测试如何分类?   1. 负载测试  …

基于Springboot+vue医院管理系统(前后端分离)

最近有一些读者问我有没有完整的基于SpringbootVue的项目源码&#xff0c;今天给大家整理了一下&#xff0c;无偿分享给大家。 功能&#xff1a; 医生信息管理 换着信息管理 挂号信息管理 药物信息管理 检查项目管理 病床信息管理 排班信息管理 数据统计分析 开发工具…

Python第五章(列表)

列表的书写格式&#xff1a;[数据1&#xff0c;数据2&#xff0c;数据3.....] 作用&#xff1a;可以存储多个数据&#xff0c;且可以为不同的数据类型 一。列表的常规操作&#xff1a; 1。查找&#xff1a; index()&#xff1a;返回类型的下标位置 语法&#xff1a;列表序…

学之思开源考试系统是一款 java + vue 的前后端分离的考试系统

学生系统功能 模块介绍登录用户名、密码注册年级、用户名、密码任务中心管理员发布的年级任务&#xff0c;每个学生只能做一次考试题干支持文本、图片、数学公式、表格等&#xff0c;学生答题支持&#xff1a;文本固定试卷可重复练习、自行批改的试卷时段试卷在时间限制内&…