目录
一、canal服务端
1.1 下载
1.2 解压
1.3 配置
1.4 启动
1.5 查看
二、canal客户端(Java编写业务程序)
2.1 SQL脚本
2.2 写POM
2.3 写Yaml
2.4 写业务类
2.4.1.项目结构
2.4.2 Utils.RedisUtil
2.4.3 biz.RedisCanalClientExample
一、canal服务端
1.1 下载
1.2 解压
tar -zxvf canal.deployer-1.1.6.tar.gz 到mycanal文件夹
1.3 配置
修改/mycan/conf/example/instance.properties
文件
-
换成自己的mysql主机master的IP地址
-
换成自己的在mysql新建的canal账户
1.4 启动
注意这个地方需要JDK环境支持才能正常启动,那就补充一下安装JDK
-
Centos7安装JDK8
-
在下载linux64版本的jdk
-
解压后放到自己指定的文件夹
-
配置环境变量:
vim /ect/profile
新增内容后在source /etc/profile
最后java -version
看是否安装成功
-
export JAVA_HOME=/myjava/jdk1.8.0_371
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JRE_HOME/lib/ext:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
-
启动canal--->在canal的bin目录下执行
./startup.sh
1.5 查看
- 查看server日志 在目录mycanal/logs/canal/下执行cat canal.log
- 查看样例example的日志 在目录
mycanal/logs/example/
下执行cat example.log
二、canal客户端(Java编写业务程序)
2.1 SQL脚本
CREATE TABLE `t_user`(`id` BIGINT(20) NOT NULL AUTO_INCREMENT,`userName` VARCHAR(100) NOT NULL,PRIMARY KEY(`id`)
)ENGINE=INNODB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
2.2 写POM
<dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.14</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.16</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency>
</dependencies>
2.3 写Yaml
server:port: 5555spring:datasource:url: jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: rootpassword: 980918driver-class-name: com.mysql.jdbc.Driverdruid:test-while-idle: false
2.4 写业务类
2.4.1.项目结构
2.4.2 Utils.RedisUtil
package com.atguigu.canal.utils;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** @author zhumq* @date 2024/7/27 9:24*/
public class RedisUtils
{public static final String REDIS_IP_ADDR = "192.168.111.185";public static final String REDIS_pwd = "111111";public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);}public static Jedis getJedis() throws Exception {if(null!=jedisPool){return jedisPool.getResource();}throw new Exception("Jedispool is not ok");}}
2.4.3 biz.RedisCanalClientExample
package com.atguigu.canal.biz;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.canal.utils.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @author zhumq* @date 2024/7/27 9:26*/
public class RedisCanalClientExample
{/*** 表示60秒的常量。* 用于定义某些操作的时间间隔。*/public static final Integer _60SECONDS = 60;/*** Redis服务器的IP地址。* 用于数据存储和检索操作中定位Redis服务器。*/public static final String REDIS_IP_ADDR = "192.168.111.185";/*** 将数据插入Redis。** @param columns 列数据列表,每项包含列名、值和更新标志。* 本方法首先将列数据列表转换为JSON对象,* 然后使用第一列的值作为键,在Redis中存储JSON字符串。** 设计目的可能是以结构化方式在Redis中存储实体的相关属性信息,* 以便于快速检索和使用。*/private static void redisInsert(List<Column> columns){// 创建一个JSON对象来存储列数据JSONObject jsonObject = new JSONObject();// 遍历列数据列表,填充JSON对象for (Column column : columns){// 打印列信息,用于调试或日志记录System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());// 将列名和值添加到JSON对象中jsonObject.put(column.getName(), column.getValue());}// 如果列数据列表不为空,则执行Redis插入操作if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){// 使用第一列的值作为键,序列化的JSON对象作为值,存储到Redis中jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());} catch (Exception e) {// 打印异常堆栈跟踪,用于错误处理或日志记录e.printStackTrace();}}}/*** 删除Redis中的键值对。** 此方法通过接收一列列名和对应的值,构建一个JSON对象。然后,它从这个JSON对象中提取第一个列的值,* 并使用这个值作为Redis键来删除对应的条目。这个方法假设Redis已经连接,并且通过RedisUtils.getJedis()* 提供了Jedis实例。** @param columns 列表,包含要删除的Redis键对应的列名和值。*/private static void redisDelete(List<Column> columns){// 构建一个JSON对象,用于存储列名和对应的值JSONObject jsonObject = new JSONObject();for (Column column : columns){// 将每一列的名称和值添加到JSON对象中jsonObject.put(column.getName(),column.getValue());}// 当列表不为空时,尝试删除Redis中的条目if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){// 使用列表中第一个列的值作为键,删除Redis中的对应条目jedis.del(columns.get(0).getValue());}catch (Exception e){// 打印堆栈跟踪,以记录任何异常e.printStackTrace();}}}/*** 更新Redis中的数据。* 该方法接收一个列的列表,将这些列的名称和值存储到JSON对象中,然后将这个JSON对象存储到Redis中。* 此方法主要用于在数据更新后,将更新的列及其值同步到Redis缓存中,以保持数据的一致性。** @param columns 列的列表,每个列包含一个名称、一个值和一个标志位表示该列是否被更新。*/private static void redisUpdate(List<Column> columns){// 创建一个JSON对象,用于存储列的名称和值。JSONObject jsonObject = new JSONObject();for (Column column : columns){// 打印列的名称、值和更新状态,用于调试和日志记录。System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());// 将列的名称和值添加到JSON对象中。jsonObject.put(column.getName(),column.getValue());}// 检查列表是否为空,如果不为空,则更新Redis。if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){// 使用列列表中第一个列的值作为键,将JSON对象序列化为字符串后存储到Redis中。jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());// 打印更新后的Redis数据,用于调试和确认更新是否成功。System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));}catch (Exception e){// 捕获并打印任何异常,确保方法在异常情况下不会中断执行。e.printStackTrace();}}}/*** 打印日志条目中的变更信息。* 此方法忽略事务开始和结束的日志条目,因为它只对实际的数据变更感兴趣。* 它解析每条日志条目中的RowChange数据,并根据变更类型(插入、删除、更新)执行相应的操作。** @param entrys 日志条目的列表,这些条目包含数据库变更的信息。*/public static void printEntry(List<Entry> entrys) {// 遍历每个日志条目for (Entry entry : entrys) {// 跳过事务开始和结束的日志条目if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {// 从日志条目的存储值中解析RowChange对象// 获取变更的row数据rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {// 如果解析失败,抛出运行时异常,并包含错误详情throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);}// 获取事件类型 获取变动类型EventType eventType = rowChage.getEventType();// 打印日志条目的基本信息,包括日志文件名、偏移量、模式名、表名和事件类型System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));// 遍历RowData列表,根据事件类型执行相应的操作(插入、删除、更新)for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.INSERT) {// 对于插入事件,调用redisInsert方法处理后的列数据redisInsert(rowData.getAfterColumnsList());} else if (eventType == EventType.DELETE) {// 对于删除事件,调用redisDelete方法处理前的列数据redisDelete(rowData.getBeforeColumnsList());} else {// EventType.UPDATE// 对于更新事件,调用redisUpdate方法处理后的列数据redisUpdate(rowData.getAfterColumnsList());}}}}/*** 程序入口主方法,用于初始化并连接Canal服务器,以监听MySQL数据库的变化。* @param args 命令行参数*/public static void main(String[] args){// 初始化时的提示信息System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");// 创建Canal客户端连接器,用于连接和通信//=================================// 创建链接canal服务端CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),"example","","");// 定义每次获取的记录数量int batchSize = 1000;// 定义空闲循环计数器,用于判断是否需要重新连接// 空闲空转计数器int emptyCount = 0;// 连接初始化完成的提示信息System.out.println("---------------------canal init OK,开始监听mysql变化------");try {// 连接Canal服务器connector.connect();// 订阅指定的数据库表变更事件//connector.subscribe(".*\\..*");connector.subscribe("bigdata.t_user");// 回滚事务,确保数据一致性connector.rollback();// 定义空闲循环的总次数int totalEmptyCount = 10 * _60SECONDS;while (emptyCount < totalEmptyCount) {// 每秒打印一次监控信息System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());// 获取一批变更记录Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据// 获取批次IDlong batchId = message.getId();// 获取记录数量int size = message.getEntries().size();// 如果批次ID为-1或记录数量为0,表示没有数据变更if (batchId == -1 || size == 0) {// 空闲计数器加1emptyCount++;// 每秒检查一次try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {// 有数据变更,重置空闲计数器//计数器重新置零emptyCount = 0;// 打印变更记录printEntry(message.getEntries());}// 确认处理完成,提交批次connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}// 监听结束的提示信息System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");} finally {// 断开与Canal服务器的连接connector.disconnect();}}
}
题外话:
-
Java程序下
connector.subscribe
配置的过滤正则
关闭资源简写
-
try-with-resources
释放资源