DataX - 全量数据同步工具

前言

        今天是2024-2-21,农历正月十二,相信今天开始是新的阶段,尽管它不是新的周一、某月一日、某年第一天,尽管我是一个很讲究仪式感的人。新年刚过去 12 天,再过 3 天就开学咯,开学之后我的大学时光就进入了冲刺阶段,之前没完成的目标和习惯务必严格要求自己执行,我也慢慢悟出了解决各种 "病症" 的办法了~

        这里推荐我喜欢的几本书:《黄金时代》、《一直特立独行的猪》、《沉默的大多数》,都是王小波的,对我收益颇深。尽管这博客是写给我自己看的 hahaha

        言归正传,今天学习 DataX,这也是一个大数据工具,和 Maxwell 差不多,它是用来做全量数据同步的,前者主要是做增量数据同步的。


1、概述

1.1、什么是 DataX

        DataX 是阿里巴巴开源的一个异构数据源离线同步工具(区别于 Maxwell、Cannal,这俩是主要是做增量同步的),致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

源码地址:https://github.com/alibaba/DataX

1.2、DataX 的设计

        为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路, DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。

1.3、支持的数据源

类型

数据源

Reader(读)

Writer(写)

RDBMS 关系型数据库

MySQL

Oracle

OceanBase

SQLServer

PostgreSQL

DRDS

通用RDBMS

阿里云数仓数据存储

ODPS

ADS

OSS

OCS

NoSQL数据存储

OTS

Hbase0.94

Hbase1.1

Phoenix4.x

Phoenix5.x

MongoDB

Hive

Cassandra

无结构化数据存储

TxtFile

FTP

HDFS

Elasticsearch

时间序列数据库

OpenTSDB

TSDB

1.4、框架设计

  • Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲, 流控,并发,数据转换等核心技术问题。

1.5、运行原理

  • Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。一个 Job 启动一个进程。
  • Task:根据不同数据源的切分策略,一个 Job 会被切分为多个 Task(由 Split 模块完成),Task 是 DataX 作业的最小单元,每个 Task 负责一部分数据的同步工作。
  • TaskGroup:Scheduler 调度模块会对 Task 进行分组,每个 TaskGroup 负责启动 Task,单个 TaskGroup 的并发数量为 5(最多同时执行 5 个Task,一个 Task 执行完就会释放掉,再进来一个 Task 继续执行)。
  • Reader -> Channel -> Writer :每个 Task 启动后,都会固定启动 Reader -> Channel -> Writer 来完成同步工作。

举例来说,用户提交了一个 DataX 作业,并且配置了 20 个并发,目的是将一个 100 张分表的 mysql 数据同步到 odps 里面。 DataX 的调度决策思路是:

  1. DataXJob 根据分库分表切分成了 100 个 Task。
  2. 根据 20 个并发,DataX 计算共需要分配 4 个 TaskGroup。
  3. 4 个 TaskGroup 平分切分好的 100 个 Task,每一个 TaskGroup 负责以 5 个并发共计运行 25 个 Task。

1.6、与 Sqoop 对比

2、DataX3.0 部署

傻瓜式安装解压,然后执行下面的脚本

python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

运行结果: 

当出现上面的结果说明安装成功,这里我们用的是 DataX 自带的一个测试作业,它是一个 json 格式的文件,之后我们的 DataX 作业也是通过自己 编写 json 文件来实现。

3、DataX 的使用

3.1、DataX 任务提交命令

        DataX的使用十分简单,用户只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可,就像我们安装时测试执行 DataX 任务的操作一样:

python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

3.2、DataX 配置文件格式

可以通过下面这个命令来查看 DataX 配置文件模板:

# -r 代表 reader -w 代表 writer
python bin/datax.py -r mysqlreader -w hdfswriter

        配置文件模板如下,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。

Reader和Writer的具体参数可参考官方文档,地址:

https://github.com/alibaba/DataX/blob/master/README.md

所以,如果我们需要自定义 DataX 任务的时候,就需要打开官网的 reader 和 writer 文档,查看需要配置哪些参数,接下来我们就来练习一下:

4、使用案例

4.1、MySQL -> HDFS

从 MySQL 写入到 HDFS ,我们就需要去官网查看 MySQLReader 和 HDFSWriter 的内容:

简而言之,MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。

案例要求:同步 gmall 数据库中 base_province 表数据到 HDFS 的 /base_province 目录

需求分析:要实现该功能,需选用 MySQLReader 和 HDFSWriter,MySQLReader 具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。

下面分别使用两种模式进行演示:

4.1.1、MySQLReader & TableMode

1)编写配置文件
vim /opt/module/datax/job/base_province.json
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"where": "id>=3","connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"table": ["base_province"]}],"password": "123456","splitPk": "","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}
2)配置说明
1. Reader 参数说明

注意:这里的 splitPk 参数是数据分片字段,一般是主键,仅支持整型 ,而且只有 TableMode 模式下才有效

2. Writer 参数说明

我们的 hdfswriter 中有一个 column 参数,但是我们知道 HDFS 是没有列的这个概念的。其实,这里代表的是我们Hive表中数据的字段类型,这个配置参数是给 Hive 看的。之后我们在使用 hdfsreader 的时候 依然要配置这个参数,这个参数的意义仍然是 hive 的数据字段。

注意:这里的 fileName 参数指的是 HDFS 前缀名而并不是完整文件名!

3)提交任务

使用DataX向HDFS同步数据时,必须确保目标路径已存在!

hadoop fs -mkdir /base_province
python bin/datax.py job/base_province.json

执行结果:

 可以看到,我们的文件名是由我们 hdfswriter 中指定的前缀 fileName + uuid 组成的。

 查看HDFS 中的文件内容(因为我们的文件是经过 gzip 压缩的,所以网页端查看不了):

hadoop fs -cat /base_province/* | zcat

可以看到,MySQL 中 34 条数据一共写入了 32 条,这是因为我们设置了 where 参数的值为 id>=3 

4.1.2、MySQLReader & QuerySQLMode

1)配置文件
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"querySql": ["select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

        可以看到,TableMode 的 mysqlreader 中是通过在 connection 参数设置 table 参数的值来指定我们的表,而这里 QuerySQLMode 模式是通过 querySql 参数来指定 SQL ,从SQL中可以得到表名。此外,QuerySQLMode 模式没有 columns 和 where 参数,因为这些都可以在 SQL 中指定。

        那 TableMode 和 QuerySQLMode 有什么区别呢?其实 QuerySQLMode 正因为它可以指定 SQL ,所以就更加灵活,我们可以使用复杂的 join 和聚合函数,这一点是 TableMode 所实现不了的。反过来,我们上面知道 TableMode 的配置文件中可以在 mysqlreader 中指定一个参数 splitPk 来开启多个 Task 去读取一张表,这一点同样是 QuerySQLMode 所不具备的,QuerySQLMode 只支持单个 Task。

        QuerySQLMode 这种模式用的还是比较少的,毕竟我们的 Hive 也可以完成数据的聚合和联结。

此外,关于 hdfswriter 还有一些注意事项:

注意事项:

HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(''),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。

解决该问题的方案有两个:

        一是修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑(也就是如果读取到 null 就把它替换为 "\\N",双斜杠是因为 DataX 是 Java 写的),可参考这里。

        二是在Hive中建表时指定null值存储格式为空字符串(''")

2)配置文件说明

mysqlreader:

hdfswriter 和上面的是一样的。

3)提交任务

结果和上面是一样的,这里不再演示。

4.1.3、DataX 传参

        通常情况下,离线数据同步任务需要每日定时重复执行(就像我们之前 flume 上传到 HDFS 也指定过),故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中 HDFS Writer 的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。

1)修改配置文件

        DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,我们只需要修改 hdfswriter 下 parameter 参数下的 path 为:

"path": "/base_province/${dt}"
 2)创建 hdfs 路径
 hadoop fs -mkdir /base_province/2020-06-14
3)提交任务
python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json

4.2、HDFS -> MySQL

案例要求:同步HDFS上的/base_province目录下的数据到 MySQL gmall 数据库下的 test_province表。

需求分析:要实现该功能,需选用 HDFSReader 和 MySQLWriter。

1)编写配置文件

vim test_province.json
{"job": {"content": [{"reader": {"name": "hdfsreader","parameter": {"defaultFS": "hdfs://hadoop102:8020","path": "/base_province","column": ["*"],"fileType": "text","compress": "gzip","encoding": "UTF-8","nullFormat": "\\N","fieldDelimiter": "\t",}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","connection": [{"table": ["test_province"],"jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"}],"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"writeMode": "replace"}}}],"setting": {"speed": {"channel": 1}}}
}

2)配置说明

hdfsreader:

mysqlwriter:

        其中 writeMode 的三种不同取值代表三种不同的 SQL 语句,其中 replace 和 on duplicate key update 都要求我们的 MySQL 表是有主键的。我们经常使用的 insert 语句是不需要主键的,所以当有主键重复的时候会直接报错。而 replace 语句如果遇到表中已经存在该主键的数据会直接替换掉, on duplicate key update 语句的话如果遇到表中已经存在该主键的数据会更新不同值的字段。

3)提交任务

创建 HDFS 输出端 MySQL 的表:

DROP TABLE IF EXISTS `test_province`;
CREATE TABLE `test_province`  (`id` bigint(20) NOT NULL,`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

提交任务:

bin/datax.py job/test_province.json

4)查看结果

5、DataX 优化

        上面 DataX 的任务配置文件中,job 下有两个参数 content 和 setting,content 是配置 reader 和 writer 的,而 setting 其实就是来给 DataX 优化用的(通过控制流量、并发)。

5.1、速度控制

        DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。

关键优化参数如下:

参数

说明

job.setting.speed.channel

并发数

job.setting.speed.record

总 record 限速(tps:条数/s)

job.setting.speed.byte

总 byte 限速(bps:字节数/s)

core.transport.channel.speed.record

单个 channel 的record限速,默认值为10000(10000条/s)

core.transport.channel.speed.byte

单个channel的byte限速,默认值1024*1024(1M/s)

注意事项:

1. 若配置了总 record 限速,则必须配置单个 channel 的 record 限速

2. 若配置了总 byte 限速,则必须配置单个 channe 的 byte 限速

3. 若配置了总 record 限速和总 byte 限速,channel 并发数参数就会失效。因为配置了总record限速和总 byte 限速之后,实际 channel 并发数是通过计算得到的:

计算公式为:

min(总byte限速/单个channel的byte限速,总record限速/单个 channel 的record限速)

配置示例:

{"core": {"transport": {"channel": {"speed": {"byte": 1048576 //单个channel byte限速1M/s}}}},"job": {"setting": {"speed": {"byte" : 5242880 //总byte限速5M/s}},...}
}

5.2、内存调整

        当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。

        建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。

        调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json

总结

        DataX 这个工具的学习就结束了,比我想象的要简单多,但是也需要好好熟悉练习一下。目前只学习了 DataX 在 HDFS 和 MySQL 之间的相互数据传递,以后用到其它框架的时候还需要精进一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2803134.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

STM32控制max30102读取血氧心率数据(keil5工程)

一、前言 MAX30102是一款由Maxim Integrated推出的低功耗、高精度的心率和血氧饱和度检测传感器模块,适用于可穿戴设备如智能手环、智能手表等健康管理类电子产品。 该传感器主要特性如下: (1)光学测量:MAX30102内置…

2024生物发酵展带您领略视觉盛宴-东滤器材

参展企业介绍 东滤器材(石家庄)有限公司是一家专注于微孔膜产品、深层过滤产品、纳米纤维产品、一次性过滤产品开发和应用的高科技企业,并于2022年顺利通过河北省“高新技术企业”权威认证。 公司拥有近两千平米符合GMP规范的十万级净化车间…

springmvc基于springboot 的音乐播放系统 _7sdu8

这就意味着音乐播放系统的设计可以比其他系统更为出色的能力,可以更高效的完成最新的ymj排行榜、ymj音乐资讯等功能。 此系统设计主要采用的是JAVA语言来进行开发,JSP技术、采用SSM框架技术,框架分为三层,分别是控制层Controller&…

NXP实战笔记(七):S32K3xx基于RTD-SDK在S32DS上配置ICU输入捕获

目录 1、概述 2、输入捕获SDK配置 2.1、SAIC中断方式 2.2、IPWM或者IPM 1、概述 输入捕获,可以抓取高电平时间、低电平时间、占空比、周期、边沿检测与回调函数、边沿计数(ABZ解码)、时间戳、唤醒中断。 记录一下根据Emios模块实现上述部分…

AGI|AI到底如何生成视频?Sora究竟为何能引爆科技圈?

目录 一、AI生成视频引发新浪潮 二、生成方法及难点 三、Sora的突破进展 (一)可生成不同尺寸视频 (二)可生成1分钟时长视频 (三)图片生成视频 (四)场景一致性 (五…

【C++私房菜】面向对象中的简单继承

文章目录 一、 继承基本概念二、派生类对象及派生类向基类的类型转换三、继承中的公有、私有和受保护的访问控制规则四、派生类的作用域五、继承中的静态成员 一、 继承基本概念 通过继承(inheritance)联系在一起的类构成一种层次关系。通常在层次关系的…

Jenkins2.426.3运行时提示:mvn: command not found

Jenkins运行时提示:mvn: command not found 第一步,查看服务器上是否已正确安装maven环境 $mvn --version 如果没有显示上面的信息,则需要重新安装maven环境后再往下进行 第二步:Jenkins配置Maven 例如:/usr/local/…

六、回归与聚类算法 - 欠拟合和过拟合

目录 1、定义 2、原因及解决方法 2.1 正则化 线性回归欠拟合与过拟合线性回归的改进 - 岭回归分类算法:逻辑回归模型保存与加载无监督学习:K-means算法 1、定义 2、原因及解决方法 2.1 正则化

洛谷B2008/2009 题解

#题外话(第35篇题解)(太简单,分两个于心不忍……)(C语言) #先看题目 2008: 2009: 题目链接: 2008https://www.luogu.com.cn/problem/B20082009https://www…

海思SD3403,SS928/926,hi3519dv500,hi3516dv500移植yolov7,yolov8(14)

自己挖了一个坑,准备做SS928/SD3403的Yolov8的移植,主要是后台私信太多人在问相关的问题。先别着急去写代码,因为在hi3516dv500下的移植还是比较顺利。之前在hi3519av100和hi3559av100系列时遇到过一些问题,所以没有继续去移植新的算法。 SS928架构乍一看和hi3559av100特别…

多来客资讯:本地生活服务平台加盟方法

本地生活一般涵盖了吃喝玩乐,而本地生活平台,则是指提供这些吃喝玩乐的互联网平台以及各大APP,比如饿了么、美团等等,这些都可以叫做本地生活服务平台。 因为这些平台都是主要做外卖、团购业务为主,所以,本…

Oracle迁移到mysql-导出mysql所有索引和主键

导出建库表索引等: [rootlnpg ~]# mysqldump -ugistar -pxxx -h192.168.207.143 --no-data -d lndb > lndb20230223-1.sql 只导出索引:参考:MYSQL导出现有库中的索引脚本_mysql 导出数据库所有表的主键和索引-CSDN博客 -- MYSQL导出现有…

项目:文本编辑器

文章目录 [toc] 文本编辑器1.项目概述1.1功能介绍1.2界面实现预览1.3界面设计简要介绍 2.设计流程2.1窗口图片,和标题更改2.1.1gui方式改变2.1.2代码方式更改2.2 QPushButton按钮设置样式表 2.2 功能实现2.2.1 打开读取文件2.2.2 打开保存文件2.2.3 文件关闭2.2.4 更…

Web 前端 UI 框架Bootstrap简介与基本使用

Bootstrap 是一个流行的前端 UI 框架,用于快速开发响应式和移动设备优先的网页。它由 Twitter 的设计师和工程师开发,现在由一群志愿者维护。Bootstrap 提供了一套丰富的 HTML、CSS 和 JavaScript 组件,可以帮助开发者轻松地构建和定制网页和…

【selenium】三大切换 iframe 弹窗alert 句柄window 和 鼠标操作

目录 一、iframe 1、切换方式: 1、第一种情况: 2、第二种情况: 方式1: 先找到iframe,定位iframe元素(可以通过元素定位的各种方式:xpath,css等等),用对象接收&…

Sora模型开启了AI视频模型的新篇章,将引领未来更多领域的创新和应用。

目录 一、Sora模型的工作原理 二、AI视频模型的无限可能性 1.视频编辑和创作 2.游戏和虚拟现实 3.教育和远程协作 4.娱乐和社交媒体 OpenAI最近推出了其首个AI视频模型Sora,这个模型能够生成逼真的视频,具有许多潜在的应用领域。本文将探讨Sora模型…

旅游景点旅行研学门票特产小程序开发

旅游景点旅行研学门票特产小程序开发 旅游线路智能推荐与精心规划,我们为用户提供丰富多样的旅游线路选择,助力您的行程安排更加顺畅无忧。 景点门票在线预订与购买功能,覆盖景区、博物馆、演出等各类门票。告别排队等待,一键操…

揭秘抖音自动评论软件的使用方法和步骤

**一、引言** 随着移动互联网的普及,抖音已经成为了人们日常生活中不可或缺的一部分。为了更好地利用抖音,我们今天就来探讨一下抖音自动评论软件的使用方法和步骤。本文将通过通俗易懂的语言,结合实际操作,帮助大家轻松掌握这一…

(十四)【Jmeter】线程(Threads(Users))之开放模型线程组(Open Model Thread Group)

简述 操作路径如下: 开放模型线程组(Open Model Thread Group) 是 JMeter 5.5 版本中引入的一个新特性,它允许用户创建具有可变负载的负载配置文件。相较于传统的线程组,开放模型线程组提供了更多的灵活性和动态调整的能力。 优点: 灵活性:允许测试人员根据测试需求动…

LED景观照明灯驱动电路串联、并联和恒流3款方案

LED景观照明灯是现代城市照明中常见的一种灯具。为了保证LED景观照明灯的正常工作,需要设计合适的驱动电路。LED景观照明灯的驱动电路可以采用串联、并联或恒流的方式来设计。 首先,串联驱动电路是指将多个LED灯串联在一起,然后接入电源进行…