官网地址:Maxwell's Daemon
版本:v1.29.2(之后的版本不支持JDK8)
一、工作原理
Maxwell把自己伪装成MySQL的一个slave,然后以slave的身份假装从MySQL(master)复制数据。
MySQL配置binlog:
1.配置文件:Linux:/etc/my.cnf;Windows:\my.ini
2.在[mysqld]区块设置添加log-bin=mysql-bin
3.binlog分类设置binlog_format=row(可选statement|mixed|row)
Maxwell想做监控分享,选择row(行级,binlog会记录每次操作后每行记录的变化)格式比较合适,优点:保持数据的绝对一致性(因为不管sql是什么,引用了什么函数,binlog只记录执行后的效果);缺点:占用空间大。
二、Maxwell与Canal的对比
对比 | Canal | Maxwell |
语言 | Java | Java |
数据格式 | 格式自由 | json |
采集数据模式 | 增量 | 全量/增量 |
数据落地 | 定制 | 支持kafka等多种平台 |
HA | 支持 | 支持 |
三、安装部署
1.解压:tar -zxvf maxwell-1.29.2.tar.gz
2.初始化Maxwell元数据库
(1)在MySQL中创建一个maxwell库用于存储Maxwell元数据
CREATE DATABASE maxwell;(2)分配一个账号可以操作该数据库
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '密码';(3)分配这个账号可以监控其他数据库的权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'maxwell'@'%';(4)刷新MySQL表权限
flush privileges;
3.Maxwell进程启动
进入Maxwell安装目录
(1)使用命令行参数启动Maxwell进程
./bin/maxwell --user='maxwell' --password='123456' --host='主机名' --producer=stdout
--user 连接mysql的用户
--password 连接mysql的用户密码
--host mysql安装的主机名或ip
--producer 生产者模式(stdout:控制台;kafka:kafka集群)
(2)修改配置文件,定制化启动Maxwell进程
cp config.properties.example config.properties
vim config.properties
./bin/maxwell --config ./config.properties
四、监控mysql数据输出到控制台
1.向test_maxwell库的test表插入一条数据,查看maxwell的控制台输出
mysql> insert into test values(1,'aaa');{"database": "test_maxwell", --库名"table": "test", --表名"type": "insert", --数据更新类型"ts": 1664438392, --操作时间"xid": 6666, --操作id"commit": true, --提交成功"data": { --数据"id": 1,"name": "aaa"}
}mysql> update test set name='bbb' where id =1;
2.修改test_maxwell库的test表插入一条数据,查看maxwell的控制台输出
mysql> update test set name='bbb' where id =1;{"database": "test_maxwell", --库名"table": "test", --表名"type": "update", --数据更新类型"ts": 1664438567, --操作时间"xid": 6667, --操作id"commit": true, --提交成功"data": { --修改后的数据"id": 1,"name": "bbb"},"old": { --修改前的数据"id": 1,"name": "aaa"}
}
3.删除test_maxwell库的test表插入一条数据,查看maxwell的控制台输出
mysql> delete from where id =1;{"database": "test_maxwell", --库名"table": "test", --表名"type": "delete", --数据更新类型"ts": 1664439235, --操作时间"xid": 6668, --操作id"commit": true, --提交成功"data": { --删除的数据"id": 1,"name": "bbb"}
}
五、监控mysql数据输出到kafka
1.实现步骤
(1)启动zookeeper和kafka
(2)启动Maxwell监控binlog
./bin/maxwell --user='maxwell' --password='123456' --host='localhost' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell
(3)打开kafka控制台的消费者消费maxwell主题
(4)向test_maxwell库的test表再次插入一条数据
mysql> insert into test values (5, 'eee');
(5)通过kafka消费者来查看到了数据,说明成功传入kafka
2.kafka主题数据的分区控制
Maxwell监控多个mysql库的数据,然后将这些数据发往kafka的一个主题Topic,并且这个主题是多分区的,为了提高并发度,实现步骤如下:
# 修改Maxwell的配置文件config.properties
# 修改生产者模式
producer=kafka
kafka.bootstrap.servers=192.168.10.1:9092# 手动在kafka创建一个多分区的主题后,在Maxwell配置该主题
# ./kafka-topic.sh --zookeeper 192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181 --create --replication-factor 2 --partitions 3 --topic maxwell3
kafka.topic=maxwell3# 分区参数配置
# 分区方式[database, table, primary_key, transaction_id, thread_id, column]
producer_partition_by=database
# 如果用column分区,需指定列名和备用分区方式
# producer_partition_columns=id,foo,bar
# 备用分区方式,以防指定列不存在
# producer_partition_by_fallback=databse
六、监控mysql指定表数据输出(过滤)
./bin/maxwell --user='maxwell' --password='123456' --host='localhost' --filter 'exclude: *.*, include: test_maxwell.test' --producer=stdout
--filter 'exclude: *.*, include: test_maxwell.test'
过滤:排除所有库.表,包含test_maxwell库的test表
七、监控mysql指定表数据全量输出(数据初始化)
insert into maxwell.bootstrap(database_name, table_name) values("test_maxwell", "test");
同步前maxwell.bootstrap表新增数据的is_complete为0,表示待同步;启动maxwell同步后is_complete为1,started_at、completed_at更新。