2024.07纪念一 debezium : spring-boot结合debezium

使用前提:

一、mysql开启了logibin

在mysql的安装路径下的my.ini中 

【mysqlid】下

添加
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

参考gitee的项目,即拉即用。参考地址

zhanghl/spring-boot-debeziumicon-default.png?t=N7T8https://gitee.com/zhl001/spring-boot-debezium项目中一个三个文件,pom和两个类需要参考。

pom.xml

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.13.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.felord</groupId><artifactId>spring-boot-debezium</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-debezium</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><debezium.version>1.5.2.Final</debezium.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springboot与mybatis的整合包--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.3.0</version></dependency><!--mysql驱动包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!--springboot与JDBC整合包--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><!--sqlserver数据源--><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>sqljdbc4</artifactId><version>4.0</version></dependency></dependencies>

两个java类

DebeziumConfiguration.java
package cn.felord.debezium.debezium;import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.List;
import java.util.Map;import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;/*** The type Debezium configuration.** @author n1* @since 2021 /6/1 17:01*/
@Configuration
public class DebeziumConfiguration {/*** Debezium 配置.** @return configuration*/@Beanio.debezium.config.Configuration debeziumConfig() {return io.debezium.config.Configuration.create()
//            连接器的Java类名称.with("connector.class", MySqlConnector.class.getName())
//            偏移量持久化,用来容错 默认值.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
//                偏移量持久化文件路径 默认/tmp/offsets.dat  如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
//                如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。.with("offset.storage.file.filename", "/tmp/offsets.dat")
//                捕获偏移量的周期.with("offset.flush.interval.ms", "1")
//               连接器的唯一名称.with("name", "mysql-connector")
//                数据库的hostname.with("database.hostname", "10.1.1.1")
//                端口.with("database.port", "3306")
//                用户名.with("database.user", "canal")
//                密码.with("database.password", "canal")
//                 包含的数据库列表,你的数据库.with("database.include.list", "md_test")
//                是否包含数据库表结构层面的变更,建议使用默认值true.with("include.schema.changes", "false")
//                mysql.cnf 配置的 server-id.with("database.server.id", 1)
//                	MySQL 服务器或集群的逻辑名称.with("database.server.name", "customer-mysql-db-server")
//                历史变更记录.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
//                历史变更记录存储位置,存储DDL.with("database.history.file.filename", "/tmp/dbhistory.dat").build();}/*** Debezium server bootstrap debezium server bootstrap.** @param configuration the configuration* @return the debezium server bootstrap*/@BeanDebeziumServerBootstrap debeziumServerBootstrap(io.debezium.config.Configuration configuration) {DebeziumServerBootstrap debeziumServerBootstrap = new DebeziumServerBootstrap();DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(configuration.asProperties()).notifying(this::handlePayload).build();debeziumServerBootstrap.setDebeziumEngine(debeziumEngine);return debeziumServerBootstrap;}private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {recordChangeEvents.forEach(r -> {SourceRecord sourceRecord = r.record();String topic = sourceRecord.topic();Struct sourceRecordChangeValue = (Struct) sourceRecord.value();if (sourceRecordChangeValue != null) {// 判断操作的类型 过滤掉读 只处理增删改   这个其实可以在配置中设置Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));if (operation != Envelope.Operation.READ) {String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;// 获取增删改对应的结构体数据Struct struct = (Struct) sourceRecordChangeValue.get(record);// 将变更的行封装为MapMap<String, Object> payload = struct.schema().fields().stream().map(Field::name).filter(fieldName -> struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));// 这里简单打印一下System.out.println("operation = " + operation);System.out.println("data = " + payload);if(operation.toString().equals("CREATE")){System.out.println("新增记录一条");}//tabelNameif(topic.split("\\.").length > 2){String tableName = topic.split("\\.")[2];System.out.println("tabelName" + tableName);}}}});}}
DebeziumServerBootstrap.java
package cn.felord.debezium.debezium;import io.debezium.engine.DebeziumEngine;
import lombok.Data;
import lombok.SneakyThrows;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;import java.util.concurrent.Executor;
import java.util.concurrent.Executors;/*** @author n1* @since 2021/6/2 10:45*/
@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {private final Executor executor = Executors.newSingleThreadExecutor();private DebeziumEngine<?> debeziumEngine;@Overridepublic void start() {executor.execute(debeziumEngine);}@SneakyThrows@Overridepublic void stop() {debeziumEngine.close();}@Overridepublic boolean isRunning() {return false;}@Overridepublic void afterPropertiesSet() throws Exception {Assert.notNull(debeziumEngine, "debeziumEngine must not be null");}
}

 结语:

太强了,比canal强10倍,非侵入性。对比可参考:

不想引入MQ?不妨试试 Debezium-CSDN博客

为什么是debezium

这么多技术框架,为什么选debezium?

看起来很多。但一一排除下来就debezium和canal。

sqoop,kettle,datax之类的工具,属于前大数据时代的产物,地位类似于web领域的structs2。而且,它们基于查询而非binlog日志,其实不属于CDC。首先排除。

flink cdc是大数据领域的框架,一般web项目的数据量属于大材小用了。

同时databus,maxwell相对比较冷门,用得比较少。

「最后不用canal的原因有以下几点:」

  • canal需要安装,这违背了“如非必要,勿增实体”的原则。

  • canal只能对MYSQL进行CDC监控。有很大的局限性。

  • 大数据领域非常流行的flink cdc(阿里团队主导)底层使用的也是debezium,而非同是阿里出品的canal。

  • debezium可借助kafka组件,将变动的数据发到kafka topic,后续的读取操作只需读取kafka,可有效减少数据库的读取压力。可保证一次语义,至少一次语义。

  • 同时,也可基于内嵌部署模式,无需我们手动部署kafka集群,可满足”如非必要,勿增实体“的原则。

  • 而且canal只支持源端MySQL版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

实时监视同步数据库变更,这个框架真是神器_Mysql

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3280765.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

收藏!国内外GPU算力厂商详细盘点

如今&#xff0c;图形处理器&#xff08;GPU&#xff09;作为计算领域的核心部件&#xff0c;其算力性能直接决定了诸多应用场景的效率和效果。从深度学习、科学计算到视频处理&#xff0c;GPU的算力已成为衡量技术实力的重要指标。本文将详细盘点当前市场上GPU算力领先的厂商&…

iOS开发-图片上涂鸦绘制撤销功能

iOS开发-图片上涂鸦绘制撤销功能 当我们需要重新在图片上进行绘制涂鸦生成新的图&#xff0c;这里使用到了Graphics中的API功能。 Graphics Framework是一套基于C的API框架&#xff0c;使用了Quartz作为绘图引擎。它提供了低级别、轻量级、高保真度的2D渲染。 微信搜索小游戏…

单线程 和多线程区别,看打印输出1000个数字效果

执⾏过程: 加载func() -> 执⾏main -> 创建⼦线程t -> ⼦线程t启动 -> 执⾏func中的内容 |-> 继续执⾏main from threading import Thread #此线程不用安装自带。T是大写注意哟 def func():for i in range(1000):print(func,i) #定义一个函数打印 if __name__ …

<数据集>DOTA v1.0遥感航拍目标识别数据集<目标检测>

数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;1869张&#xff08;训练集1411&#xff0c;验证集458&#xff09; 标注数量(xml文件个数)&#xff1a;1869 标注数量(txt文件个数)&#xff1a;1869 标注类别数&#xff1a;15 标注类别名称&#xff1a;[plane, ba…

基于Python的哔哩哔哩国产动画排行数据分析系统

需要本项目的可以私信博主&#xff0c;提供完整的部署、讲解、文档、代码服务 随着经济社会的快速发展&#xff0c;中国影视产业迎来了蓬勃发展的契机&#xff0c;其中动漫产业发展尤为突出。中国拥有古老而又璀璨的文明&#xff0c;仅仅从中提取一部分就足以催生出大量精彩的…

python——joblib进行缓存记忆化-对计算结果缓存

问题场景 在前端多选框需要选取多个数据进行后端计算。 传入后端是多个数据包的对应路径。 这些数据包需要按一定顺序运行&#xff0c;通过一个Bag(path).get_start_time() 可以获得一个float时间值进行排序&#xff0c;但由于数据包的特性&#xff0c;这一操作很占用性能和时…

碰撞检测 | 矩形增量膨胀安全走廊模型(附C++/Python仿真)

目录 0 专栏介绍1 安全走廊建模的动机2 矩形增量膨胀算法3 算法仿真3.1 C实现3.2 Python实现 0 专栏介绍 &#x1f525;课设、毕设、创新竞赛必备&#xff01;&#x1f525;本专栏涉及更高阶的运动规划算法轨迹优化实战&#xff0c;包括&#xff1a;曲线生成、碰撞检测、安全走…

哪个牌子的眼镜清洗机好?买超声波清洗机有必要吗

生活中&#xff0c;我们经常忽视眼镜的清洁。你知道吗&#xff1f;眼镜如果长时间不清洁的话&#xff0c;镜片上的污垢和油脂会让视线变得模糊不清&#xff0c;甚至油污滋生的细菌还可能伤害到我们的眼睛&#xff0c;比如引起眼睛疲劳或炎症。为了保持眼镜干净&#xff0c;现在…

生鲜 52 周 MD如何助力业绩提升

生鲜 52 周 MD &#xff0c;顾名思义&#xff0c;就是以一年 52 周为周期&#xff0c;对生鲜商品进行精细化、动态化的营销规划。它不再是传统的固定化、模式化的销售方式&#xff0c;而是根据每周的季节特点、节日氛围、消费趋势以及市场变化&#xff0c;精心策划生鲜商品的种…

11 优化器

目录 1. 随机梯度下降系优化器&#xff1a;SGD 1.1 算法种类 1.2 优缺点 2 SGDM 即为SGD with momentum 动量 2.1 公式 2.2 动量的优缺点 优点 缺点 2.3 使用场景 3 AdaGrad 3.1 公式 3.2 AdaGrad的优缺点 优点 缺点 3.3 使用场景 3.4 Adam 3.4.1 Adam优化器的…

计算机网络01

文章目录 浏览器输入URL后发生了什么&#xff1f;Linux 系统是如何收发网络包的&#xff1f;Linux 网络协议栈Linux 接收网络包的流程Linux 发送网络包的流程 浏览器输入URL后发生了什么&#xff1f; URL解析 当在浏览器中输入URL后&#xff0c;浏览器首先对拿到的URL进行识别…

2024最全RabbitMQ集群方案汇总

之前在网上找rabbitmq集群方案有哪几种时&#xff0c;基本上看到的答案都不太一样&#xff0c;所以此文的主要目的是梳理一下rabbitmq集群方案&#xff0c;对rabbitmq集群方案的笔记并不是搭建的笔记。 总结了一些文章&#xff0c;rabbitmq集群大概有五种方案&#xff1a;普通…

[Linux安全运维] iptables包过滤

前言 防火墙是网络安全中非常重要的设备&#xff0c;是一种将内部网络和外部网络隔离开的技术。简单来说&#xff0c;防火墙技术就是访问控制技术&#xff0c;由规则和动作组成。 1. Linux 包过滤防火墙 1 .1 概述 iptables&#xff1a; 指的是管理Linux防火墙的命令程序&a…

Windows 下后台启动 jar 包,UTF-8 启动 jar 包_windows启动jar

转自:https://blog.csdn.net/2401_83817971/article/details/137514739 本文介绍了如何使用javaw.exe后台启动Javajar包&#xff0c;如何在Windows中管理和设置cmd编码&#xff0c;以及与Python开发相关的学习资源。包括UTF-8编码启动jar包的方法和Windows下关闭后台服务的技…

superset 不显示mysql的选项问题

superset不显示mysql的选项 数据库驱动未安装&#xff1a;确保你已经安装了Python的MySQL数据库驱动&#xff0c;比如mysqlclient。 pip install mysqlclient

vue一些npm i 时报错问题解决【JAVA前后端分离】

前端vue npm i 安装时出现 作为一个懂些前端得 JAVA开发自然是要粗暴解决这个问题了 问题解决 使用命令 npx -p npm6 npm i 即可编译 原因&#xff1a; ERESOLVE与npm版本有关&#xff0c;因为npm版本高对某些事情比npm6.x更严格。通常&#xff0c;最简单的解决方法是将--…

【机器学习基础】机器学习概述与实践基础

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈Python机器学习 ⌋ ⌋ ⌋ 机器学习是一门人工智能的分支学科&#xff0c;通过算法和模型让计算机从数据中学习&#xff0c;进行模型训练和优化&#xff0c;做出预测、分类和决策支持。Python成为机器学习的首选语言&#xff0c;…

STC12C5A60S2单片机输出pwm的方法

所谓的pwm输出就是让单片机在某个管脚上按照时间输出特定频率和占空比的矩形方波。这里面有两个参数&#xff0c;一是频率&#xff0c;二是占空比。两者互不干涉。以STC12C5A60S2为例子&#xff0c;本51型单片机可以有两路pwm输出。以其中的一路为例说明&#xff1b; 首先说频…

Teltonika FMXXX系列定位器解析说明

1.产品外观 2.数据包说明 2.1.登录包 **模块第一次上线&#xff0c;先发送其对应的IMEI号&#xff0c;数据包如下&#xff1a;** 000F313233343536373839303132333435**数据包解析如下&#xff1a;** 000F --packet starts 313233343536373839303132333435 --IMEI 1234567890…

区块链软硬件协同,做产业数字化转型的“安全官” |《超话区块链》直播预告

今年的两会政府工作报告提出&#xff1a;“产业的数字化&#xff08;行业数字化转型&#xff09;是发展新质生产力的核心&#xff0c;是推动产业升级实现高质量发展的关键。”全面推进产业数字化&#xff0c;需要技术创新与产业应用深入协同&#xff1b;立足可持续发展的长远目…