Maxwell 介绍
Maxwell 是由美国 zendesk 开源,用 java 编写的 Mysql 实时抓取软件, 其抓取的 原理也是基于 binlog。
官网
https://maxwells-daemon.io/
Maxwell 和 canal 工具对比
➢ Maxwell 没有 canal 那种 server+client 模式,只有一个 server 把数据发送到消息队列 或 redis。如果需要多个实例,通过指定不同配置文件启动多个进程。
➢ Maxwell 有一个亮点功能,就是 canal 只能抓取最新数据,对已存在的历史数据没有办
法处理。而 Maxwell 有一个 bootstrap 功能,可以直接引导出完整的历史数据用于初
始化,非常好用。
➢ Maxwell 不能直接支持 HA,但是它支持断点还原,即错误解决后重启继续上次点儿读
取数据,canal也支持。
➢ Maxwell 只支持 json 格式,而 Canal 如果用 Server+client 模式的话,可以自定义格
式。
➢ Maxwell 比 Canal 更加轻量级。
安装使用
前提开启了mysql的binlog在cannel使用的时候已经讲解
前提
创建保存断点续传的数据库,并且创建maxwell用户密码为123456,对于shishimaxwell数据库给maxwell授予权限
mysql -uroot -p123456
CREATE DATABASE shishimaxwell ;
#maxwell是用戶名
GRANT ALL ON shishimaxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
#maxwell是用戶名
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
tar -zxvf maxwell-1.25.0.tar.gz
修改配置
cp config.properties.example
vi config.properties
producer=kafka
kafka.bootstrap.servers=master:9092,node1:9092,node2:9092
#需要添加
kafka_topic=gmall_db# mysql login info,这里的mysql既是开启binlog的mysql,也是自己创建保存读取位置shishimaxwell的mysql
#数据库host
host=master
#数据库用户
user=maxwell
#数据库密码
password=123456#需要添加 后续bootstrap初始化会用
client_id=maxwell_1
#指定消费位置保存的数据库
schema_database=shishimaxwell
注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱
binlog 的顺序
如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 producer_partition_by 属性
可选值 producer_partition_by=database|table|primary_key|random| column
修改表结构注意
如果是修改了表的结构,修改以后打印的数据为
ADD COLUMN `xinde` varchar(255) NULL COMMENT '修改时间' AFTER `operate_time`" to gmall, new schema id is 2tbeat=0] after applying "ALTER TABLE `gmall`.`comment_info`
但是如果没有配置schema_host那么修改表结构的数据是不会发送到kafka的,也就是说普通模式只会吧增删改查的数据发送到kafka
启动
/home/bigdata/shishishell/maxwell/maxwell-1.25.0/bin/maxwell --config /home/bigdata/shishishell/maxwell/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
查看数据库是否生成了表
数据产生的格式
增
{"database": "gmall","table": "comment_info","type": "insert","ts": 1657694285,"xid": 79219,"commit": true,"data": {"id": 1547108081146556427,"user_id": 682,"nick_name": null,"head_img": null,"sku_id": 24,"spu_id": 8,"order_id": 6897,"appraise": "1204","comment_txt": "评论内容:82154639985516955573787189871861313164594735727667","create_time": "2022-06-28 14:38:04","operate_time": null}
}
删
{"database": "gmall","table": "base_dic","type": "delete","ts": 1657694385,"xid": 111301,"commit": true,"data": {"dic_code": "1101","dic_name": "支付宝","parent_code": "11","create_time": null,"operate_time": null}
}
改
{"database": "gmall","table": "base_dic","type": "update","ts": 1657694417,"xid": 111341,"commit": true,"data": {"dic_code": "1103","dic_name": "银联1","parent_code": "11","create_time": null,"operate_time": null},"old": {"dic_name": "银联"}
}
bootstap全量
--client_id maxwell_1就是上面配置的东西,它的作用是bootstap只是查询出了数据,而--client_id maxwell_1是指点配置文件里面使用maxwell的一个客户端同步数据
bin/maxwell-bootstrap --user maxwell --password 123456 --host master --database gmall --table user_info --client_id maxwell_1
生成的数据如下
开始的一条数据为空
{"database": "gmall","table": "user_info","type": "bootstrap-start","ts": 1657718813,"data": {}
}
{"database": "gmall","table": "user_info","type": "bootstrap-insert","ts": 1657718505,"data": {"id": 6800,"login_name": "m9ml9e7xq4bx","nick_name": "寒寒","passwd": null,"name": "元寒","phone_num": "13132628314","email": "m9ml9e7xq4bx@yeah.net","head_img": null,"user_level": "1","birthday": "1998-05-28","gender": "F","create_time": "2022-06-29 04:49:45","operate_time": null,"status": null}
}
总结数据特点
➢ 日志结构
canal 每一条 SQL 会产生一条日志,如果该条 Sql 影响了多行数据,则已经会通过集
合的方式归集在这条日志中 。(即使是一条数据也会是数组结构)
maxwell 以影响的数据为单位产生日志,即每影响一条数据就会产生一条日志。如果
想知道这些日志是否是通过某一条 sql 产生的可以通过 xid 进行判断,相同的 xid 的日志来
自同一 sql。
➢ 数字类型
当原始数据是数字类型时, maxwell 会尊重原始数据的类型不增加双引 ,变为字符串。
canal 一律转换为字符串。
➢ 带原始数据字段定义
canal 数据中会带入表结构 。maxwell 更简洁。
配置文件模板
log_level=info#producer=stdout
producer=kafka
kafka.compression.type=snappy
kafka.batch.size=10000
kafka.request.timeout.ms = 360000
kafka.retries=3
kafka.acks=-1
kafka.bootstrap.servers=ip:9092,ip:9092,ip:9092
kafka_topic=%{database}_%{table}
#output_ddl=true
producer_partition_by=primary_key
kafka_partition_hash=murmur3# mysql login info
host=localhost
port=3306
user=maxwell
password=密码replication_host=mysql_host
replication_user=username
replication_password=password
replication_port=3306
filter=exclude: *.*, include: 数据库.表, include: 数据库.表, include: 数据库.表
jdbc_options=serverTimeZone=Asia/Shanghai&characterEncoding=utf-8&useSSL=false
client_id=maxwell_1
replica_server_id=1
下面文章还行
Maxwell安装、配置、脚本制作_maxwell-bootstrap_江畔独步的博客-CSDN博客