数据同步工具之DataX实操

一、DataX部署

  1. 上传DataX压缩文件至/opt/software/目录下,并解压文件至/opt/module/下。
    在这里插入图片描述

  2. 自测检查DataX,出现如下截图内容,说明安装成功
    在这里插入图片描述
    二、DataX使用

  3. DataX使用概述
    DataX使用还是十分简单的,用户只需要根据自己同步数据的数据源和目的地来选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行下述命令进行提交数据同步任务即可。

[yili@hadoop102 datax]$ python bin/datax.py job/job.json
  1. DataX配置文件格式
    可以通过下述命令进行查看DataX配置文件模板
[yili@hadoop102 datax]$ python bin/datax.py -d mysqlreader -w hdfswriter

https://gitee.com/mirrors/DataX/blob/master/README.md

  1. 同步MySQL数据到HDFS案例
    (1)案例要求:同步gmall数据库中base_province表数据到HDFS的/base_province目录
    (2)需求分析:要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。
    下面分别使用两种模式进行演示。
    1)MySQLReader之TableMode
    Json配置文件内容:
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"where": "id>=3","connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"table": ["base_province"]}],"password": "123456","splitPk": "","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/user/yili/gmall/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

在这里插入图片描述

2)MySQLReader之QuerySQLMode
Json配置文件内容:

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"querySql": ["select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/base_province","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}
  1. DataX传参
    【说明】通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
    DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值。
    1)具体示例如下:
[yili@hadoop102 datax]$ hadoop fs -mkdir /user/yili/gmall/base_province/2020-07-08
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"connection": [{"jdbcUrl": ["jdbc:mysql://hadoop102:3306/gmall"],"querySql": ["select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"]}],"password": "123456","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "name","type": "string"},{"name": "region_id","type": "string"},{"name": "area_code","type": "string"},{"name": "iso_code","type": "string"},{"name": "iso_3166_2","type": "string"}],"compress": "gzip","defaultFS": "hdfs://hadoop102:8020","fieldDelimiter": "\t","fileName": "base_province","fileType": "text","path": "/user/yili/gmall/base_province/${dt}","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

2)执行脚本

[yili@hadoop102 datax]$ python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json

3)查看结果

[yili@hadoop102 datax]$ hadoop fs -ls  /user/yili/gmall/base_province/
Found 2 items
drwxr-xr-x   - atguigu supergroup          0 2022-07-08 21:41 /user/yili/gmall/base_province/2020-07-08
  1. 同步HDFS数据到MySQL案例实操
    (1)案例要求:同步HDFS上的/user/yili/gmall/base_province目录下的数据到MySQL gmall 数据库下的test_province表。
    (2)需求分析:要实现该功能,需选用HDFSReader和MySQLWriter。
    配置文件如下所示:
{"job": {"content": [{"reader": {"name": "hdfsreader","parameter": {"defaultFS": "hdfs://hadoop102:8020","path": "/base_province","column": ["*"],"fileType": "text","compress": "gzip","encoding": "UTF-8","nullFormat": "\\N","fieldDelimiter": "\t",}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","connection": [{"table": ["test_province"],"jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"}],"column": ["id","name","region_id","area_code","iso_code","iso_3166_2"],"writeMode": "replace"}}}],"setting": {"speed": {"channel": 1}}}
}

(3)查看Mysql目标表输出
在这里插入图片描述

  1. DataX配置文件生成脚本(改进:对json串输出增加格式化操作 -> 增加程序文件内容可读性)
    (1)[yili@hadoop102 datax]$ vi /home/yili/bin/gen_import_config.py
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "Wkz57Heu(h"#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"#获取mysql连接
def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)#获取表格的元数据  包含列名和数据类型
def get_mysql_meta(database, table):connection = get_connection()cursor = connection.cursor()sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"cursor.execute(sql, [database, table])fetchall = cursor.fetchall()cursor.close()connection.close()return fetchall#获取mysql表的列名
def get_mysql_columns(database, table):return map(lambda x: x[0], get_mysql_meta(database, table))#将获取的元数据中mysql的数据类型转换为hive的数据类型  写入到hdfswriter中
def get_hive_columns(database, table):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string"}return mappings[mysql_type]meta = get_mysql_meta(database, table)return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)#生成json文件
def generate_json(source_database, source_table):job = {"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table),"splitPk": "","connection": [{"table": [source_table],"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"fileType": "text","fileName": source_table,"column": get_hive_columns(source_database, source_table),"writeMode": "append","fieldDelimiter": "\t","compress": "gzip","path": "${targetdir}"}}}]}}if not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:json.dump(job, f, sort_keys=True, indent=4, separators=(',', ':'))def main(args):source_database = ""source_table = ""options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])for opt_name, opt_value in options:if opt_name in ('-d', '--sourcedb'):source_database = opt_valueif opt_name in ('-t', '--sourcetbl'):source_table = opt_valuegenerate_json(source_database, source_table)if __name__ == '__main__':main(sys.argv[1:])

(2)执行脚本

[yili@hadoop102 bin]$ python gen_import_config.py -d gmall -t activity_info

(3)生成json结果文件

[yili@hadoop102 datax]$ python bin/datax.py job/import/gmall.activity_info.json -p "-Dtargetdir=/user/yili/gmall/db/activity_info_full/2022-07-06"

(4)执行日志显示成功
在这里插入图片描述
(5)HDFS上生成文件
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/142839.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

数据同步工具Sqoop

大数据Hadoop之——数据同步工具Sqoop - 掘金 (juejin.cn) 1 概述 Apache Sqoop(SQL-to-Hadoop)项目旨在协助RDBMS(Relational Database Management System:关系型数据库管理系统)与Hadoop之间进行高效的大数据交流。…

本地与服务器文件同步软件哪个好,同步软件哪个好,亲身体验的3款免费同步软件介绍...

数据同步在某些方面是非常的重要,特别是重要数据,做为服务器运维这方面工作的同学应该是深有体会,小编从事运维工作一年中共接触了3款同步软件,每一款都用了一段时间,算是有点小心得,所以分享给大家&#x…

数据同步工具—DataX 初识

DataX 初识 DataX 是阿里云 DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数…

DBSync数据库同步工具

通用型的同步软件,支持SQL、NoSQL各种数据库,用于系统对接、数据备份、异地传输等。 文章目录 通用型的同步软件,支持SQL、NoSQL各种数据库,用于系统对接、数据备份、异地传输等。一、功能特点非侵入式,独立运行支持各…

正版授权 知名专业数据备份和数据同步软件工具 - GoodSync

GoodSync 软件简介 GoodSync 是一款知名的数据备份和数据同步软件工具,可以在多台电脑之间、电脑和移动存储设备之间,或者云存储之间,亦或是电脑的本地文件夹之间来进行数据双向同步或单向备份,支持 Windows、Mac 以及 Linux 平台…

GoodSync 数据自动同步备份工具VS傲梅轻松备份系统数据备份工具 哪款更好?

备份和同步软件的使用越来越广泛,因为在这个数据驱动的时代,数据的备份和恢复非常重要。在这里我想向大家推荐两款备份和同步软件——GoodSync和傲梅轻松备份。 GoodSync是一款备份和同步软件,它可以在多个设备之间同步文件、文件夹、照片、音…

Android开发环境搭建[Java1.7+eclipse+sdk4.0](某高校物联网工程专业必看!)

Android开发环境搭建[Java1.7eclipsesdk4.0](某高校物联网工程专业必看!) 0. 前言1. 资料拷贝2. 配置环境变量2.1新建 Java_Home2.2 编辑Path情况1情况2 2.3 新建sdk环境变量 3. 验证安装3.1 验证java安装情况3.2 验证eclipse安装 4. 导入已有…

mysql-索引_MySQL-索引

mysql-索引 MySQL-索引 (MySQL - INDEXES) A database index is a data structure that improves the speed of operations in a table. Indexes can be created using one or more columns, providing the basis for both rapid random lookups and efficient ordering of acc…

MySql的索引?

MySql的索引? 1 Hash索引? 通过hashCode去匹配,查找数据库中唯一值的速度很快,不支持范围查找,联合索引也不支持 只适用于select * from table where id 5;只适用于等于的情况 2红黑树? 数据量大的情况下,红黑树的树太高了,查询最深处的数据时,磁盘读取次数较多 3 B树? …

mysql中索引

一、概述 1、What? 索引是对数据库表中一列或多列的值进行排序的的一种结构,可以提高数据库中特定的数据查询速度。 索引时一个单独存储在磁盘上的数据库结构,包括对数据表里面的所有记录的引用指针。 索引时在存储引擎中实现的,…

mysql 之索引

什么是索引: 索引是一种高效获取数据的 存储结构,一般包含了 hash 二叉树 红黑树。 但是mysql中索引一般使用的是B树 准确说是使用的B树构建的索引:若仅仅是进行select * from table where id 1,用上述的三种方法都会很轻松的实…

Mysql__索引

1)索引问题----组合索引 最左前缀匹配原则 在mysql建立联合索引时会遵循最左前缀匹配的原则,即最左优先,在检索数据时从联合索引的最左边开始匹配 ALTER TABLE index ADD INDEX test_AA_BB_CC_DD (AA,BB,CC,DD);SHOW INDEX FROM index;EXPLAIN SELECT …

【Mysql 索引】

索引的基本知识 1. 索引介绍 索引的出现就是为了提高数据检索效率,就跟书的目录一样。索引不但在内存中,还写在硬盘中。索引是存储引擎实现的。 2. 索引常见模型 搜索树: 每个节点左儿子小于父节点,父节点小于右节点. select/update 复杂…

Mysql、索引

索引 数据库中的查询操作非常普遍,索引就是提升查找速度的一种手段 索引的类型 从数据结构角度分 1.B索引:传统意义上的索引,最常用最普遍的索引2.hash索引:hash索引是一种自适应的索引,数据库会根据表的使用情况自动生…

MySQL—索引

索引是什么? 索引是一种特殊的文件,它们包含着对数据表里所有记录的引用指针。 索引是一种数据结构,是数据库管理系统中一个排序的数据结构,以协助快速查询、更新数据表中的数据。通俗来说,索引相当与目录&#xff0c…

MySQL的索引有哪些

一、索引是什么# 索引,在MySQL中也叫“键(key)”,是存储引擎用于快速找到记录的一种数据结构。如果把数据库的一张表比作一本书,那索引则是这本书的目录,通过目录,我们能快速找到我们想要的主题…

mysql 的 索引

「深度学习福利」大神带你进阶工程师,立即查看>>> 1 什么是索引? 索引是数据库管理系统中一个排序的数据结构,以协助快速查询、更新数据库表中数据。 索引的实现 通常使用B树及其变种B树。 索引相当于字典的目录,作用…

Mysql高性能索引

一、索引是什么 二、索引的底层实现原理 三、InnoDB的存储结构是怎样的? 四、InnoDB索引和MyIsam索引对比 五、Mysql为什么会选错索引 六、唯一索引和普通索引的区别 导读:本博文讲解了索引是什么和索引的底层原理,提到了 BTREE和 BTREE hash底层…

MySQL:索引

一、索引的常见模型 索引的出现是为了提高数据查询的效率。实现索引的方式有很多种,比较常见的数据结构有:哈希表、有序数组和搜索树。 索引是在存储引擎层实现的,不同存储引擎索引工作方式不同。 1.1 哈希表 哈希表:键值存储…

【MySQL】MySQL的索引

目录 索引1.1 概念1.2 作用1.3 使用场景1.4 使用1.5 索引最常用的数据结构 索引 1.1 概念 索引是一种特殊的文件,包含着对数据表里所有记录的引用指针。可以对表中的一列或多列创建索引, 并指定索引的类型,各类索引有各自的数据结构实现。 …