Canal监听Mysql回写Redis

目录

一、canal服务端

1.1 下载

1.2 解压

1.3 配置

1.4 启动

1.5 查看

二、canal客户端(Java编写业务程序)

2.1 SQL脚本

2.2 写POM

2.3 写Yaml

2.4 写业务类

2.4.1.项目结构

2.4.2 Utils.RedisUtil

2.4.3 biz.RedisCanalClientExample


一、canal服务端

1.1 下载

1.2 解压

tar -zxvf canal.deployer-1.1.6.tar.gz 到mycanal文件夹

1.3 配置

修改/mycan/conf/example/instance.properties文件

  • 换成自己的mysql主机master的IP地址

  • 换成自己的在mysql新建的canal账户

1.4 启动

注意这个地方需要JDK环境支持才能正常启动,那就补充一下安装JDK

  • Centos7安装JDK8

    • 在下载linux64版本的jdk

    • 解压后放到自己指定的文件夹

    • 配置环境变量:vim /ect/profile新增内容后在source /etc/profile 最后java -version 看是否安装成功

export JAVA_HOME=/myjava/jdk1.8.0_371
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JRE_HOME/lib/ext:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
  • 启动canal--->在canal的bin目录下执行 ./startup.sh

1.5 查看

  • 查看server日志 在目录mycanal/logs/canal/下执行cat canal.log
  • 查看样例example的日志 在目录mycanal/logs/example/下执行cat example.log

二、canal客户端(Java编写业务程序)

2.1 SQL脚本

CREATE TABLE `t_user`(`id` BIGINT(20) NOT NULL AUTO_INCREMENT,`userName` VARCHAR(100) NOT NULL,PRIMARY KEY(`id`)
)ENGINE=INNODB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

2.2 写POM

<dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.14</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.16</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency>
</dependencies>

2.3 写Yaml

server:port: 5555spring:datasource:url: jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: rootpassword: 980918driver-class-name: com.mysql.jdbc.Driverdruid:test-while-idle: false

2.4 写业务类

2.4.1.项目结构

2.4.2 Utils.RedisUtil

package com.atguigu.canal.utils;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;/*** @author zhumq* @date 2024/7/27 9:24*/
public class RedisUtils
{public static final String  REDIS_IP_ADDR = "192.168.111.185";public static final String  REDIS_pwd = "111111";public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);}public static Jedis getJedis() throws Exception {if(null!=jedisPool){return jedisPool.getResource();}throw new Exception("Jedispool is not ok");}}

2.4.3 biz.RedisCanalClientExample

package com.atguigu.canal.biz;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.canal.utils.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @author zhumq* @date 2024/7/27 9:26*/
public class RedisCanalClientExample
{/*** 表示60秒的常量。* 用于定义某些操作的时间间隔。*/public static final Integer _60SECONDS = 60;/*** Redis服务器的IP地址。* 用于数据存储和检索操作中定位Redis服务器。*/public static final String REDIS_IP_ADDR = "192.168.111.185";/*** 将数据插入Redis。** @param columns 列数据列表,每项包含列名、值和更新标志。*                本方法首先将列数据列表转换为JSON对象,*                然后使用第一列的值作为键,在Redis中存储JSON字符串。**                设计目的可能是以结构化方式在Redis中存储实体的相关属性信息,*                以便于快速检索和使用。*/private static void redisInsert(List<Column> columns){// 创建一个JSON对象来存储列数据JSONObject jsonObject = new JSONObject();// 遍历列数据列表,填充JSON对象for (Column column : columns){// 打印列信息,用于调试或日志记录System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());// 将列名和值添加到JSON对象中jsonObject.put(column.getName(), column.getValue());}// 如果列数据列表不为空,则执行Redis插入操作if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){// 使用第一列的值作为键,序列化的JSON对象作为值,存储到Redis中jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());} catch (Exception e) {// 打印异常堆栈跟踪,用于错误处理或日志记录e.printStackTrace();}}}/*** 删除Redis中的键值对。** 此方法通过接收一列列名和对应的值,构建一个JSON对象。然后,它从这个JSON对象中提取第一个列的值,* 并使用这个值作为Redis键来删除对应的条目。这个方法假设Redis已经连接,并且通过RedisUtils.getJedis()* 提供了Jedis实例。** @param columns 列表,包含要删除的Redis键对应的列名和值。*/private static void redisDelete(List<Column> columns){// 构建一个JSON对象,用于存储列名和对应的值JSONObject jsonObject = new JSONObject();for (Column column : columns){// 将每一列的名称和值添加到JSON对象中jsonObject.put(column.getName(),column.getValue());}// 当列表不为空时,尝试删除Redis中的条目if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){// 使用列表中第一个列的值作为键,删除Redis中的对应条目jedis.del(columns.get(0).getValue());}catch (Exception e){// 打印堆栈跟踪,以记录任何异常e.printStackTrace();}}}/*** 更新Redis中的数据。* 该方法接收一个列的列表,将这些列的名称和值存储到JSON对象中,然后将这个JSON对象存储到Redis中。* 此方法主要用于在数据更新后,将更新的列及其值同步到Redis缓存中,以保持数据的一致性。** @param columns 列的列表,每个列包含一个名称、一个值和一个标志位表示该列是否被更新。*/private static void redisUpdate(List<Column> columns){// 创建一个JSON对象,用于存储列的名称和值。JSONObject jsonObject = new JSONObject();for (Column column : columns){// 打印列的名称、值和更新状态,用于调试和日志记录。System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());// 将列的名称和值添加到JSON对象中。jsonObject.put(column.getName(),column.getValue());}// 检查列表是否为空,如果不为空,则更新Redis。if(columns.size() > 0){try(Jedis jedis = RedisUtils.getJedis()){// 使用列列表中第一个列的值作为键,将JSON对象序列化为字符串后存储到Redis中。jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());// 打印更新后的Redis数据,用于调试和确认更新是否成功。System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));}catch (Exception e){// 捕获并打印任何异常,确保方法在异常情况下不会中断执行。e.printStackTrace();}}}/*** 打印日志条目中的变更信息。* 此方法忽略事务开始和结束的日志条目,因为它只对实际的数据变更感兴趣。* 它解析每条日志条目中的RowChange数据,并根据变更类型(插入、删除、更新)执行相应的操作。** @param entrys 日志条目的列表,这些条目包含数据库变更的信息。*/public static void printEntry(List<Entry> entrys) {// 遍历每个日志条目for (Entry entry : entrys) {// 跳过事务开始和结束的日志条目if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {// 从日志条目的存储值中解析RowChange对象// 获取变更的row数据rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {// 如果解析失败,抛出运行时异常,并包含错误详情throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);}// 获取事件类型 获取变动类型EventType eventType = rowChage.getEventType();// 打印日志条目的基本信息,包括日志文件名、偏移量、模式名、表名和事件类型System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));// 遍历RowData列表,根据事件类型执行相应的操作(插入、删除、更新)for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.INSERT) {// 对于插入事件,调用redisInsert方法处理后的列数据redisInsert(rowData.getAfterColumnsList());} else if (eventType == EventType.DELETE) {// 对于删除事件,调用redisDelete方法处理前的列数据redisDelete(rowData.getBeforeColumnsList());} else {// EventType.UPDATE// 对于更新事件,调用redisUpdate方法处理后的列数据redisUpdate(rowData.getAfterColumnsList());}}}}/*** 程序入口主方法,用于初始化并连接Canal服务器,以监听MySQL数据库的变化。* @param args 命令行参数*/public static void main(String[] args){// 初始化时的提示信息System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");// 创建Canal客户端连接器,用于连接和通信//=================================// 创建链接canal服务端CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),"example","","");// 定义每次获取的记录数量int batchSize = 1000;// 定义空闲循环计数器,用于判断是否需要重新连接// 空闲空转计数器int emptyCount = 0;// 连接初始化完成的提示信息System.out.println("---------------------canal init OK,开始监听mysql变化------");try {// 连接Canal服务器connector.connect();// 订阅指定的数据库表变更事件//connector.subscribe(".*\\..*");connector.subscribe("bigdata.t_user");// 回滚事务,确保数据一致性connector.rollback();// 定义空闲循环的总次数int totalEmptyCount = 10 * _60SECONDS;while (emptyCount < totalEmptyCount) {// 每秒打印一次监控信息System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());// 获取一批变更记录Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据// 获取批次IDlong batchId = message.getId();// 获取记录数量int size = message.getEntries().size();// 如果批次ID为-1或记录数量为0,表示没有数据变更if (batchId == -1 || size == 0) {// 空闲计数器加1emptyCount++;// 每秒检查一次try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {// 有数据变更,重置空闲计数器//计数器重新置零emptyCount = 0;// 打印变更记录printEntry(message.getEntries());}// 确认处理完成,提交批次connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}// 监听结束的提示信息System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");} finally {// 断开与Canal服务器的连接connector.disconnect();}}
}

题外话:

  • Java程序下connector.subscribe配置的过滤正则

关闭资源简写

  • try-with-resources释放资源

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3269593.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

麦田物语第十五天

系列文章目录 麦田物语第十五天 文章目录 系列文章目录一、构建游戏的时间系统二、时间系统 UI 制作总结 一、构建游戏的时间系统 在该游戏中我们要构建年月日天时分秒等时间的概念&#xff0c;从而实现季节的更替&#xff0c;昼夜的更替等&#xff08;不同的季节可以播种不同…

基于gaussian计算NICS值评估分子体系的芳香性和反芳香性

计算分子NICS值的基本流程 其中第1行的Bq即为NICS(0)虚原子对应的位置&#xff0c;7.5538为NICS(0)对应虚原子位置处的各向同性化学位移屏蔽值。 由于NICS值为各向同性化学屏蔽值的负值&#xff0c;因此苯的NICS(0)和NICS(1)分别为-7.5538和-10.5301&#xff0c;这也表明苯分…

通信类IEEE会议——第四届通信技术与信息科技国际学术会议(ICCTIT 2024)

[IEEE 独立出版&#xff0c;中山大学主办&#xff0c;往届均已见刊检索] 第四届通信技术与信息科技国际学术会议&#xff08;ICCTIT 2024&#xff09; 2024 4th International Conference on Communication Technology and Information Technology 重要信息 大会官网&#xf…

python生成系统测试数据

开发系统的时候,为了系统可以更好的进行测试,一般需要准备测试数据,以便可以顺利的对各种场景进行测试,使用两张表来说明怎么快速生成测试数据。 1.用户表 一般登录的时候,需要用到用户表 用户表字段如下: 用户名、密码、姓名、性别、邮箱、手机号、用户类型、地址 下…

Linux进程间通信(管道+共享内存)

进程间通信&#xff08;interprocess communication&#xff0c;简称 IPC&#xff09;指两个进程之间的通信。系统中的每一个进程都有各自的地址空间&#xff0c;并且相互独立、隔离&#xff0c;每个进程都处于自己的地址空间中。所以同一个进程的不同模块&#xff08;譬如不同…

【PyQt5】一文向您详细介绍 setPlaceholderText() 的作用

【PyQt5】一文向您详细介绍 setPlaceholderText() 的作用 下滑即可查看博客内容 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; &#x1f393; 博主简介&#xff1a;985高校的普通…

剑和沙盒 6 - 线程辱骂 – 使用线程名称进行攻击

强调&#xff1a; 进程注入是攻击者工具包中的重要技术之一。在下面的文章中 解释了如何滥用线程描述 API 来绕过端点保护产品。提出了一种新的注入技术&#xff1a;Thread Name-Calling&#xff0c;并给出了实施保护的相关建议。 介绍 进程注入是攻击者使用的重要技术之一 。…

《Programming from the Ground Up》阅读笔记:p75-p87

《Programming from the Ground Up》学习第4天&#xff0c;p75-p87总结&#xff0c;总计13页。 一、技术总结 1.persistent data p75, Data which is stored in files is called persistent data, because it persists in files that remain on disk even when the program …

一文理解生成式AI应用的五个级别:Tool、Chatbot、Copilot、Agent 和 Intelligence

当下&#xff0c;很多人对 AI 一知半解&#xff0c;并不能很好地区分&#xff1a;Tool、Chatbot、Copilot、Agent 和 Intelligence 概念之间的区别。 最近读完 《真格基金戴雨森谈生成式AI&#xff1a;这是比移动互联网更大的创业机会&#xff0c;开始行动是关键 》 发现讲的特…

机器人无人机视觉避障常见方式及优缺点总结

视觉避障是一种通过视觉传感器&#xff08;如摄像头&#xff09;捕捉环境图像信息&#xff0c;经过图像处理和计算机视觉算法分析&#xff0c;识别并定位障碍物&#xff0c;进而实现避障的技术。 一、常见方式 机器人无人机视觉避障是指通过视觉传感器获取周围环境信息&#x…

html+css 实现悬浮按钮

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享htmlcss 绚丽效果&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 文…

AI机器人本地免费部署(部署Llama 3.1详细教程)

昨日&#xff0c;Meta公司发布了人工智能模型——Llama 3.1。 那么Llama 3.1 405B的效果怎么样&#xff1f;我们来对比一张图&#xff0c;横向对比一下GPT-4。 可以看出&#xff0c;Llama 3.1 405B在各类任务中的表现可以与GPT-4等顶级的模型相差无几。 那么&#xff0c;我们…

2024年国际高校数学建模大赛(IMMCHE)问题A:金字塔石的运输完整思路代码 结果分享(仅供学习)

2024 International Mathematics Molding Contest for Higher Education Problem A: Transportation of Pyramid Stones&#xff08;2024年国际高校数学建模大赛&#xff08;IMMCHE&#xff09;问题A&#xff1a;金字塔石的运输&#xff09; 金字塔的建造是古埃及文明的杰出成…

嵌入式Python、决策树算法、SQLite、Flask、树莓派、机器学习:基于算法自主决策的智能家居系统(代码示例)

项目概述 随着物联网技术的快速发展&#xff0c;智能家居系统越来越普及&#xff0c;成为现代家庭生活的重要组成部分。本文将介绍一个基于Raspberry Pi和Arduino的智能家居算法控制系统的硬件平台。该系统能够通过传感器采集环境数据&#xff0c;并利用机器学习算法进行分析与…

Unity 资源 之 Pop It 3D 解压玩具与双人AI游戏 Unity 资源包分享

精彩呈现&#xff1a;Pop It 3D 解压玩具与双人AI游戏 Unity 资源包分享 一、Pop It 3D 解压玩具的魅力二、双人游戏的互动乐趣三、Unity 游戏资源包的优势四、如何获取资源包 亲爱的游戏爱好者们&#xff0c;今天为大家带来一款令人兴奋的游戏资源——Pop It 3D 解压玩具双人带…

VMware虚拟机中CentOS7自定义ip地址并且固定ip

配置固定ip(虚拟机) 前提&#xff1a;虚拟机网络配置成&#xff0c;自定义网络并选择VMnet8(NAT 模式) 操作(如下图)&#xff1a;点击虚拟机–》设置–》–》硬件–》网络适配器–》自定义&#xff1a;特定虚拟网络–》选择&#xff1a;VMnet8(NAT 模式) 虚拟机网络设置 需要记…

数据分析中常用的数据分析工具

在数据分析中&#xff0c;常用的数据分析工具种类繁多&#xff0c;它们各自具有不同的特点和优势&#xff0c;适用于不同的数据分析场景和需求。以下是一些常用的数据分析工具分类及具体介绍&#xff1a; 一、Excel生态工具 Excel是微软开发的一款电子表格软件&#xff0c;广…

【最接近原点的 K 个点】python刷题记录

R2-排序算法 有点像快速排序 快排&#xff1f;根本不用 class Solution:def kClosest(self, points: List[List[int]], k: int) -> List[List[int]]:#直接按照欧几里得距离排序points.sort(keylambda x:sqrt(x[0]**2x[1]**2))ret[]for i in range(k):ret.append(points[i]…

NLP-使用Word2vec实现文本分类

Word2Vec模型通过学习大量文本数据&#xff0c;将每个单词表示为一个连续的向量&#xff0c;这些向量可以捕捉单词之间的语义和句法关系。本文做文本分类是结合Word2Vec文本内容text&#xff0c;预测其文本标签label。以下使用mock商品数据的代码实现过程过下&#xff1a; 1、…

干货|永久免费SSL证书申请——七步实现网站HTTPS

在数字化时代&#xff0c;网站的安全性成为了衡量其专业性和可信度的重要标准之一。启用HTTPS协议&#xff0c;即通过安装SSL证书&#xff0c;可以确保数据在用户浏览器和服务器之间传输时的加密性&#xff0c;保护用户隐私和数据安全。对于个人博客、小型企业或预算有限的组织…