4. 业务数据采集平台搭建

4. 业务数据采集平台搭建

  • 业务数据采集模块
    • Hive安装部署
    • 业务数据同步概述
      • 数据同步策略概述
      • 数据同步策略选择
      • 数据同步工具概述
    • DataX 数据同步工具
    • Maxwell 数据同步工具
    • 全量表数据同步
      • 数据通道
      • DataX 配置文件
      • DataX 配置文件生成脚本
        • 生成文件
        • 生成全部配置文件脚本
        • 测试生成的 DataX 配置文件
      • 全量表数据同步脚本
      • 全量表同步总结
    • 增量表数据同步
      • 数据通道
      • Maxwell 配置
        • 通道测试
      • Flume 配置
        • 创建 Flume 配置文件
        • 编写Flume拦截器
        • 编写 Flume 启停脚本
        • 通道测试
      • 增量表首日全量同步
      • 增量表同步总结
    • 行为采集数据
    • 业务数据采集

业务数据采集模块

Hive安装部署

https://blog.csdn.net/qq_44226094/article/details/123218860

业务数据同步概述

数据同步策略概述

每日定时从业务数据库抽取数据,传输到数据仓库中,之后再对数据进行分析统计

为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步,离线数仓的计算周期通常为,所以数据同步周期为天 ( 每天同步一次 )

数据的同步策略 :

  • 全量同步
  • 增量同步

全量同步 : 每天都将业务数据库中的全部数据同步一份到数据仓库,保证两侧数据同步的最简单的方式

在这里插入图片描述

增量同步 : 每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表 ( 首日一次全量同步 )

在这里插入图片描述

数据同步策略选择

两种策略对比 :

同步策略优点缺点
全量同步逻辑简单在某些情况下效率较低。例如某张表数据量较大,但是每天数据的变化比例很低,若对其采用每日全量同步,则会重复同步和存储大量相同的数据
增量同步效率高,无需同步和存储重复数据逻辑复杂,需要将每日的新增及变化数据同原来的数据进行整合,才能使用

结论:业务表数据量大,且每天数据变化低 ( 增量同步 ) ,否则 全量同步

各表同步策略:

全量 :

  • activity_info 活动表
  • activity_rule 优惠规则表
  • base_category1 商品一级分类
  • base_category2 商品二级分类
  • base_category3 商品三级分类
  • base_dic 编码字典表
  • base_province 省份表
  • base_region 地区表
  • base_trademark 品牌表
  • cart_info 加购表(特殊)
  • coupon_info 优惠卷表
  • sku_attr_value SKU平台属性表
  • sku_sale_attr_value SKU销售属性表
  • sku_info SKU商品表
  • spu_info SPU商品表

增量 :

  • cart_info 加购表 ( 特殊 )
  • comment_info 商品评论表
  • coupon_use 优惠卷领用表
  • favor_info 收藏表
  • order_detail_activity 订单明细活动关联表
  • order_detail_coupon 订单明细优惠卷关联表
  • order_detail 订单详情表
  • order_info 订单表
  • order_refund_info 退单表
  • order_status_log 订单状态表
  • payment_info 支付表
  • refund_payment 退款表
  • user_info 用户表

在这里插入图片描述

数据同步工具概述

数据同步工具 :

  • 离线、批量同步 : 基于 Select 查询 , DataX、Sqoop
  • 实时流式同步 : 基于 binlog , Maxwell、Canal
增量同步方案DataX / SqoopMaxwell / Canal
对数据库的要求数据表中存在create_time、update_time等字段,然后根据这些字段获取变更数据要求数据库记录变更操作,如 : MySQL开启 binlog
数据的中间状态获取最后一个状态,中间状态无法获取获取变更数据的所有中间状态

全量同步 : DataX

增量同步 : Maxwell

DataX 数据同步工具

https://blog.csdn.net/qq_44226094/article/details/123261959

Maxwell 数据同步工具

https://blog.csdn.net/qq_44226094/article/details/123319206

全量表数据同步

数据通道

全量表数据由 DataX 从 MySQL 业务数据库直接同步到 HDFS

在这里插入图片描述

目标路径中表名须包含后缀 full , 表示该表为全量同步
目标路径中包含一层日期 , 用以对不同天的数据进行区分

DataX 配置文件

每张全量表编写一个 DataX 的 json 配置文件

栗子 : activity_info 活动信息表

字段名字段说明类型
id活动idbigint(20)
activity_name活动名称varchar(200)
activity_type活动类型(1:满减,2:折扣)varchar(10)
activity_desc活动描述varchar(2000)
start_time开始时间datetime(0)
end_time结束时间datetime(0)
create_time创建时间datetime(0)
vim activity_info.json
{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["id","activity_name","activity_type","activity_desc","start_time","end_time","create_time"],"connection": [{"jdbcUrl": ["jdbc:mysql://cpucode102:3306/gmall"],"table": ["activity_info"]}],"password": "xxxxxx","splitPk": "","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [{"name": "id","type": "bigint"},{"name": "activity_name","type": "string"},{"name": "activity_type","type": "string"},{"name": "activity_desc","type": "string"},{"name": "start_time","type": "string"},{"name": "end_time","type": "string"},{"name": "create_time","type": "string"}],"compress": "gzip","defaultFS": "hdfs://cpucode101:8020","fieldDelimiter": "\t","fileName": "activity_info","fileType": "text","path": "${targetdir}","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

由于目标路径包含一层日期,用于对不同天的数据加以区分,故 path 参数并未写死,需在提交任务时通过参数动态传入,参数名称为 targetdir

在这里插入图片描述

创建 HDFS 文件

hadoop fs -mkdir -p /origin_data/gmall/db/activity_info_full/2020-06-14

在这里插入图片描述

在这里插入图片描述

数据同步

python bin/datax.py job/activity_info.json -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2020-06-14"

在这里插入图片描述

在这里插入图片描述

DataX 配置文件生成脚本

DataX 配置文件批量生成脚本

Datax 往 hdfs 写数据配置 HA 高可用 : https://cpucode.blog.csdn.net/article/details/123824203

gen_import_config.py 脚本

vim gen_import_config.py 
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb#MySQL相关配置,需根据实际情况作出修改
mysql_host = "cpucode102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "xxxxx"#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "cpucode101"
hdfs_nn_port = "8020"#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"#获取mysql连接
def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)#获取表格的元数据  包含列名和数据类型
def get_mysql_meta(database, table):connection = get_connection()cursor = connection.cursor()sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"cursor.execute(sql, [database, table])fetchall = cursor.fetchall()cursor.close()connection.close()return fetchall#获取mysql表的列名
def get_mysql_columns(database, table):return map(lambda x: x[0], get_mysql_meta(database, table))#将获取的元数据中 mysql 的数据类型转换为 hive 的数据类型  写入到 hdfswriter 中
def get_hive_columns(database, table):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string"}return mappings[mysql_type]meta = get_mysql_meta(database, table)return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)#生成json文件
def generate_json(source_database, source_table):job = {"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table),"splitPk": "","connection": [{"table": [source_table],"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"fileType": "text","path": "${targetdir}","fileName": source_table,"column": get_hive_columns(source_database, source_table),"writeMode": "append","fieldDelimiter": "\t","compress": "gzip"}}}]}}if not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:json.dump(job, f)def main(args):source_database = ""source_table = ""options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])for opt_name, opt_value in options:if opt_name in ('-d', '--sourcedb'):source_database = opt_valueif opt_name in ('-t', '--sourcetbl'):source_table = opt_valuegenerate_json(source_database, source_table)if __name__ == '__main__':main(sys.argv[1:])

在这里插入图片描述

安装 Python Mysql 驱动

http://mirrors.163.com/centos/7/os/x86_64/Packages/

在这里插入图片描述

把文件上传到 /opt/software

sudo rpm -ivh MySQL-python-1.2.5-1.el7.x86_64.rpm

在这里插入图片描述

权限 :

chmod 777 gen_import_config.py

在这里插入图片描述

脚本使用说明

python gen_import_config.py -d database -t table
  • -d : 数据库名
  • -t : 表名

生成文件

python gen_import_config.py -d gmall -t activity_info

在这里插入图片描述

文件在 /opt/module/datax/job/import

在这里插入图片描述

数据进行同步

python bin/datax.py job/import/gmall.activity_info.json -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2020-06-14"

在这里插入图片描述

在这里插入图片描述

生成全部配置文件脚本

创建 gen_import_config.sh 脚本

vim gen_import_config.sh
#!/bin/bashpython ~/bin/gen_import_config.py -d gmall -t activity_info
python ~/bin/gen_import_config.py -d gmall -t activity_rule
python ~/bin/gen_import_config.py -d gmall -t base_category1
python ~/bin/gen_import_config.py -d gmall -t base_category2
python ~/bin/gen_import_config.py -d gmall -t base_category3
python ~/bin/gen_import_config.py -d gmall -t base_dic
python ~/bin/gen_import_config.py -d gmall -t base_province
python ~/bin/gen_import_config.py -d gmall -t base_region
python ~/bin/gen_import_config.py -d gmall -t base_trademark
python ~/bin/gen_import_config.py -d gmall -t cart_info
python ~/bin/gen_import_config.py -d gmall -t coupon_info
python ~/bin/gen_import_config.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config.py -d gmall -t sku_info
python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config.py -d gmall -t spu_info

在这里插入图片描述

gen_import_config.sh 脚本增加执行权限

chmod 777 gen_import_config.sh

在这里插入图片描述

执行 gen_import_config.sh 脚本,生成配置文件

gen_import_config.sh

配置文件 :

ll /opt/module/datax/job/import/

在这里插入图片描述

测试生成的 DataX 配置文件

例子 : activity_info

目的 : 测试用脚本生成的配置文件是否可用

创建目标路径

DataX 同步任务要求目标路径提前存在,故需手动创建路径,当前 activity_info 表的目标路径应为 /origin_data/gmall/db/activity_info_full/2020-06-14

hadoop fs -mkdir -p /origin_data/gmall/db/activity_info_full/2020-06-15

在这里插入图片描述

在这里插入图片描述

执行DataX同步命令

python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2020-06-15" /opt/module/datax/job/import/gmall.activity_info.json

在这里插入图片描述

观察同步结果

观察 HFDS 目标路径是否出现数据

http://cpucode101:9870/explorer.html#/origin_data/gmall/db/activity_info_full/2020-06-15

在这里插入图片描述

全量表数据同步脚本

全量表数据同步脚本 mysql_to_hdfs_full.sh

vim mysql_to_hdfs_full.sh 
#!/bin/bashDATAX_HOME=/opt/module/datax# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;thendo_date=$2
elsedo_date=`date -d "-1 day" +%F`
fi#处理目标路径,此处的处理逻辑是,
#如果目标路径不存在,则创建;
#若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {hadoop fs -test -e $1if [[ $? -eq 1 ]]; thenecho "路径$1不存在,正在创建......"hadoop fs -mkdir -p $1elseecho "路径$1已经存在"fs_count=$(hadoop fs -count $1)content_size=$(echo $fs_count | awk '{print $3}')if [[ $content_size -eq 0 ]]; thenecho "路径$1为空"elseecho "路径$1不为空,正在清空......"hadoop fs -rm -r -f $1/*fifi
}#数据同步
import_data() {datax_config=$1target_dir=$2handle_targetdir $target_dirpython $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}case $1 in
"activity_info")import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date;;
"activity_rule")import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date;;
"base_category1")import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date;;
"base_category2")import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date;;
"base_category3")import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date;;
"base_dic")import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date;;
"base_province")import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date;;
"base_region")import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date;;
"base_trademark")import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date;;
"cart_info")import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date;;
"coupon_info")import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date;;
"sku_attr_value")import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date;;
"sku_info")import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date;;
"sku_sale_attr_value")import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date;;
"spu_info")import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date;;
"all")import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_dateimport_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_dateimport_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_dateimport_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_dateimport_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_dateimport_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date;;
esac

在这里插入图片描述

mysql_to_hdfs_full.sh 增加执行权限

chmod 777 mysql_to_hdfs_full.sh

在这里插入图片描述

测试同步脚本

mysql_to_hdfs_full.sh all 2020-06-14

在这里插入图片描述

检查同步结果

查看 HDFS 目表路径是否出现全量表数据,全量表共 15 张

在这里插入图片描述

全量表同步总结

全量表同步逻辑简单,只需每日执行全量表数据同步脚本 mysql_to_hdfs_full.sh

增量表数据同步

数据通道

在这里插入图片描述

目标路径中表名须包含后缀 inc,为增量同步
目标路径中包含一层日期,用以对不同天的数据进行区分

Maxwell 配置

cart_infocomment_info 等共计13张表需进行增量同步,Maxwell 同步 binlog 中的所有表的数据变更记录

为方便下游使用数据, Maxwell 将不同表的数据发往不同的 Kafka Topic

修改 Maxwell 配置文件 config.properties

vim /opt/module/maxwell-1.29.2-study/config.properties
log_level=infoproducer=kafka
kafka.bootstrap.servers=cpucode101:9092,cpucode102:9092#kafka topic动态配置
kafka_topic=%{table}# mysql login info
host=cpucode102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai#表过滤,只同步特定的13张表
filter= include:gmall.cart_info,include:gmall.comment_info,include:gmall.coupon_use,include:gmall.favor_info,include:gmall.order_detail,include:gmall.order_detail_activity,include:gmall.order_detail_coupon,include:gmall.order_info,include:gmall.order_refund_info,include:gmall.order_status_log,include:gmall.payment_info,include:gmall.refund_payment,include:gmall.user_info

在这里插入图片描述

重新启动 Maxwell

mxw.sh restart

在这里插入图片描述

通道测试

启动 Zookeeper 和 Kafka 集群

Zookeeper 分布式安装

https://blog.csdn.net/qq_44226094/article/details/123119682

Kafka 分布式安装部署 :

https://blog.csdn.net/qq_44226094/article/details/123121544

启动一个 Kafka Console Consumer,消费任一 topic 数据

kafka-console-consumer.sh --bootstrap-server cpucode101:9092 --topic cart_info

生成模拟数据

cd /opt/module/db_log/
java -jar gmall2020-mock-db-2021-11-14.jar 

在这里插入图片描述

观察Kafka消费者是否能消费到数据

在这里插入图片描述

Flume 配置

Flume 需要将 Kafka 中各 topic 的数据传输到 HDFS,故其需选用 :

  • KafkaSource
  • HDFSSink
  • Channe 选用 FileChanne

KafkaSource 需订阅 Kafka 中的 13 个 topic,HDFSSink 需要将不同 topic 的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据

配置要点 :

KafkaSource

#订阅13个topic
kafka.topics =
cart_info,comment_info,coupon_use,favor_info,order_detail_activity,order_detail_coupon,order_detail,order_info,order_refund_info,order_ status_log,payment_info,refund_payment,user_info#为Event增加一个header,key为topic,value为Event来自的Kafka Topic。
setTopicHeader = true
topidHeader = topic#自定义时间戳拦截器为Event增加一个header,key 为timestamp,value为json字符串中ts字段的值
interceptors = il
interceptors.i1.type = TimeStampInterceptor.Builder

HDFSSink

#path中包含自定义转义序列和时间转移序列,用于将不同topic的数据放到不同的路径,以及不同日期的数据放到不同的路径
path=/origin_data/gmall/db/%{topic}_inc/%Y-%m-%d

数据实例 :

在这里插入图片描述

创建 Flume 配置文件

Flume 的 job 目录下创建 kafka_to_hdfs_db.conf

vim job/kafka_to_hdfs_db.conf

配置文件内容 :

a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = cpu101:9092,cpu102:9092
a1.sources.r1.kafka.topics = cart_info,comment_info,coupon_use,favor_info,order_detail_activity,order_detail_coupon,order_detail,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.cpucode.flume.interceptor.db.TimestampInterceptor$Buildera1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1123456
a1.channels.c1.keep-alive = 6## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

在这里插入图片描述

分发 :

xsync job/

在这里插入图片描述

编写Flume拦截器

新建一个Maven项目

pom.xml 文件 :

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
</dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

在这里插入图片描述

com.cpucode.flume.interceptor.db 包下创建 TimestampInterceptor

package com.cpucode.flume.interceptor.db;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** @author : cpucode* @date : 2022/3/12 14:15* @github : https://github.com/CPU-Code* @csdn : https://blog.csdn.net/qq_44226094*/
public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);Long ts = jsonObject.getLong("ts");//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒String timeMills = String.valueOf(ts * 1000);headers.put("timestamp", timeMills);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}
}

打好的包放入到 cpu103 的 /opt/module/flume-1.9.0/lib 文件夹下

ls | grep flumeETL-3.1.0-jar-with-dependencies.jar

在这里插入图片描述

编写 Flume 启停脚本

/home/cpu/bin 目录下创建脚本 f3.sh

vim f3.sh
#!/bin/bashcase $1 in
"start")echo " --------启动 cpu103 业务数据flume-------"ssh cpu103 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")echo " --------停止 cpu103 业务数据flume-------"ssh cpu103 "ps -ef | grep kafka_to_hdfs_db.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

在这里插入图片描述

脚本执行权限

chmod 777 f3.sh

在这里插入图片描述

通道测试

启动 Zookeeper、Kafka 集群

Zookeeper 分布式安装

https://blog.csdn.net/qq_44226094/article/details/123119682

Kafka 分布式安装部署 :

https://blog.csdn.net/qq_44226094/article/details/123121544

f3启动

f3.sh start

在这里插入图片描述

生成模拟数据

java -jar gmall2020-mock-db-2021-11-14.jar

在这里插入图片描述

HDFS 上的目标路径是否有数据出现

在这里插入图片描述

在这里插入图片描述

数据目标路径的日期说明 :

发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期

在这里插入图片描述

为了模拟真实环境 , 修改 Maxwell 配置文件 config.properties ,增加 mock_date 参数

#该日期须和 /opt/module/db_log/application.properties 中的 mock.date 参数保持一致
mock_date=2020-06-14

在这里插入图片描述

仅供学习使用,修改该参数后重启Maxwell才可生效

重启Maxwell

mxw.sh restart

在这里插入图片描述

重新生成模拟数据

java -jar gmall2020-mock-db-2021-11-14.jar

观察HDFS目标路径日期是否正常

在这里插入图片描述

增量表首日全量同步

增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用 Maxwell 的 bootstrap 功能

mysql_to_kafka_inc_init.sh

vim mysql_to_kafka_inc_init.sh
#!/bin/bash# 该脚本的作用是初始化所有的增量表,只需执行一次MAXWELL_HOME=/opt/module/maxwell-1.29.2-studyimport_data() {$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}case $1 in
"cart_info")import_data cart_info;;
"comment_info")import_data comment_info;;
"coupon_use")import_data coupon_use;;
"favor_info")import_data favor_info;;
"order_detail")import_data order_detail;;
"order_detail_activity")import_data order_detail_activity;;
"order_detail_coupon")import_data order_detail_coupon;;
"order_info")import_data order_info;;
"order_refund_info")import_data order_refund_info;;
"order_status_log")import_data order_status_log;;
"payment_info")import_data payment_info;;
"refund_payment")import_data refund_payment;;
"user_info")import_data user_info;;
"all")import_data cart_infoimport_data comment_infoimport_data coupon_useimport_data favor_infoimport_data order_detailimport_data order_detail_activityimport_data order_detail_couponimport_data order_infoimport_data order_refund_infoimport_data order_status_logimport_data payment_infoimport_data refund_paymentimport_data user_info;;
esac

在这里插入图片描述

mysql_to_kafka_inc_init.sh 增加执行权限

chmod 777 mysql_to_kafka_inc_init.sh

在这里插入图片描述

清理历史数据

hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

在这里插入图片描述

执行同步脚本

mysql_to_kafka_inc_init.sh all 

在这里插入图片描述

观察HDFS上是否重新出现增量表数据

在这里插入图片描述

增量表同步总结

增量表同步,需要在首日进行一次全量同步,后续每日才是增量同步

首日进行全量同步时,需先启动数据通道,包括 Maxwell、Kafka、Flume,然后执行增量表首日同步脚本 mysql_to_kafka_inc_init.sh 进行同步

每日只需保证采集通道正常运行即可,Maxwell 会实时将变动数据发往 Kafka

行为采集数据

启动 f1 , kafka , f2

数据是动态监控本地磁盘文件的 ,如果生产数据 ,会被发送到对应的 HDFS 文件夹中

启动所有的服务之后调用 lg.sh 模拟生成行为数据

如果需要生产 6月15号 的数据 ,只需要修改 application.yml 文件中的参数 之后再执行 lg.sh

在这里插入图片描述

业务数据采集

修改版的 maxwell , 可以手动控制 json 中的时间

同步数据:

  • 使用 gen_import_config.py 脚本能传入库名和表名生产对应的 json 文件
  • 使用 gen_import_config.sh 脚本一次性生成全部全量表的 json 文件 ( 前面两步只需要操作一次 以后再使用都不需要重复操作 )
  • 使用同步数据脚本 mysql_to_hdfs_full.sh all 日期 ( 必须保证数据生产一天 导入一天的 不能一次性把数据全部生产)

同步数据:

  • 启动 maxwell , f3 , kafka
  • 首日同步使用 maxwell-bootstrap 功能 直接用脚本
    mysql_to_kafka_inc_init.sh all 不能填写日期 , 因为日期在 maxwell 的配置文件中写死了

每日同步 :

  • 修改 maxwell 的配置文件 , 将日期修改为 06-15 之后重启 maxwell
  • 修改 application.properties 文件 , 将日期修改为 06-15 同时将重置内容设置为 0 , 不再重置 , 之后调用 java -jar gmall2020-mock-db-2021-11-14.jar 生产数据 maxwell 会自动监控完成同步

在这里插入图片描述

电商数据仓库系统 :

https://blog.csdn.net/qq_44226094/article/details/123013113

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/143246.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

高德新版全类别AOI采集与分析

8 高德AOI采集与分析 8.1 AOI是什么 AOI是兴趣面&#xff08;area of interest&#xff0c;简称AOI&#xff09;&#xff0c;也叫信息面&#xff0c;指的是地图数据中的区域状的地理实体。 在主流互联网电子地图中&#xff0c;POI&#xff08;Point of Interest&#xff09;指兴…

大数据数据采集工具简介

随着大数据技术体系的发展&#xff0c;越来越多的企业应用大数据技术支撑自己的业务发展。数据采集作为大数据的起点&#xff0c;是企业主动获取数据的一种重要手段。数据采集的多样性、全面性直接影响数据质量。 企业获取数据的渠道分为内部和外部两个渠道。内部渠道包含自己建…

猿创征文|大数据开发必备的数据采集工具汇总

文章目录 前言1、Flume适用场景工作方式 2、Flink CDC适用场景工作方式 3、Sqoop适用场景工作方式 4、Canal适用场景工作方式 5、Kettle适用场景工作方式 前言 随着大数据近几年的发展&#xff0c;已经在国内外的开发市场积累出一大批大数据开发的技术型人才&#xff0c;不论是…

【高数+复变函数】傅里叶变换的性质

文章目录 【高数复变函数】傅里叶变换的性质一、常见性质1.1 线性性质1.2 位移性质1.3 微分性质1.4 积分性质1.5 乘积定理1.6 能量积分 二、卷积2.1 卷积运算2.2 运算应用2.3 卷积定理 三、相关函数 【高数复变函数】傅里叶变换的性质 上一节&#xff1a;【高数复变函数】傅里…

怎么利用计算机向邮箱传输文件,如何使用QQ邮箱发送整个文件夹-电脑自学网

QQ邮箱只能发送单个文件&#xff0c;那么该如何使用QQ邮箱发送整个文件夹呢&#xff1f;其实方法非常简单&#xff0c;通过压缩文件夹就可以了&#xff0c;下面请看具体操作。 解决办法&#xff1a; 1、打开我们电脑&#xff0c;在磁盘中找到我们等下要在邮箱中进行发送的文件夹…

云服务器如何发送邮件

在共有云上目前都是封锁25端口的&#xff0c;也就是说想要在服务器上搭建SMTP是不可能的&#xff0c;除非申请解封25端口。 但是一般情况为了方便使用&#xff0c;可以直接选择第三方的SMTP服务器&#xff0c;如163&#xff0c;qq等等&#xff0c;省去自建的麻烦。 操作如下&a…

给别人发邮件怎么发

现如今是互联网时代&#xff0c;电子邮箱的普及率几乎已经到了“人手一个”的情况&#xff0c;而除了个人邮箱之外&#xff0c;很多职场人还会配备专门的工作邮箱。但由于工作邮件的专业性&#xff0c;它的发送与随意的个人邮件有一定区别&#xff0c;如何礼貌地给别人发邮件怎…

qq邮箱服务器接收和发送文件夹,将QQ邮箱打造成为你的邮箱总管-qq邮箱怎么发送文件夹...

其实邮箱多了也不是一件好事&#xff0c;每次进入QQ邮箱时&#xff0c;都要先登录到相应的网站&#xff0c;而且邮箱多了&#xff0c;用户名和密码也容易忘记。那有没有什么办法&#xff0c;不用登陆各个网站就能同时收各个邮箱的邮件呢&#xff01;有人可能会说使用foxmail或者…

计算机 桌面上的文件怎么发送,文本文件如何发送到QQ邮箱里?

2017-11-02 怎么恢复右键菜单 向右键菜单发送到中添加二级菜单的方法 方法一&#xff1a; 打开同版本系统的C:Users用户名AppDataRoamingMicrosoftWindowsSendTo目录&#xff0c;复制全部内容&#xff0c;然后将复制内容粘贴到问题电脑同名文件夹即可。 方法二&#xff1a; 1、…

qq邮箱怎么发送html文件在哪里,QQ邮箱怎么发送文件夹

人们在发送qq邮件时希望把文件夹的内容全部发送过去&#xff0c;可是qq邮箱都没有发送文件夹的功能&#xff0c;那么文件夹中所有的文件就只有一张一张添加附件进行发送吗?其实可以把文件压缩后进行发送。QQ邮箱怎么发送文件夹?下面一起来看看qq邮箱发送文件夹教程。 操作步骤…

后台录制直播视频

1、下载软件https://pan.baidu.com/s/1nvB_qa9fS_njpQ5TPy9oGQ 密码&#xff1a;8888 2、打开有直播的网站&#xff0c;以抖音为例 3、按住F12进出开发者模式→选中网络→过滤输入&#xff1a;flv→在过滤结果&#xff08;成功&#xff09;处复制网址&#xff1b;如下图 4、…

网页视频怎么录制?这两个方法操作简单,录制高清!

网页视频怎么录制&#xff1f;当大家在观看网页上的视频、直播、网课时&#xff0c;想要将视频录制下来&#xff0c;结果发现很多视频设置了权限&#xff0c;不允许下载。这个时候就需要用到好用的录屏神器啦&#xff0c;既可以将视频录制下来&#xff0c;还能最大程度还原清晰…

如何用计算机录视频,如何在电脑上录制正在播放的视频?原来方法这么简单

如何在电脑上录制正在播放的视频&#xff1f;原来方法这么简单 2019年10月14日 12:15作者&#xff1a;黄页编辑&#xff1a;黄页 分享 如何在电脑上录制正在播放的视频?网页视频是当今比较方便的一种视频播放形式&#xff0c;无需下任何软件即可在线观看视频&#xff0c;如看电…

有没有什么免费的网页视频录制软件?PC端视频录制软件集合

有没有什么免费的网页视频录制软件?说起视频录制软件&#xff0c;应给没有人不知道吧&#xff0c;通过第三方工具将发生在屏幕上的视频录制下来&#xff0c;但是这样的软件工具非常多。下面就给大家介绍一下这3款软件! 超级捕快&#xff1a;点击左侧链接下载 超级捕快是一款…

Android中小视频录制,预览

工作中项目需求&#xff0c;在动态里面支持查看录制的小视频&#xff1b;录制视频主要用到Android中MediaRecorder这个类&#xff0c;在录制的过程中需要结合SurfaceView和Camera&#xff0c;Camera管理手机摄像头&#xff0c;SurfaceView负责将Camera捕捉到的图像渲染出来&…

如何下载网页上的视频

方法一&#xff1a;使用 Firefox浏览器中Video DownloadHelper扩展 下载知乎视频 方法二&#xff1a;使用 https://xbeibeix.com/api/bilibili/ 下载B站biliili的视频 1&#xff0c;使用GitHub项目you-get&#xff0c;中文解释 cd 需要下载的目录为止&#xff08;下载的文件在…

html实现视频录制,保存和回放

录制和保存视频&#xff0c;通过三个按钮配合完成。 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> </head> <body ><div style"width: 80%; margin: 0 au…

视频录制软件哪个好,推荐几款简单实用的视频课件录制软件

日常生活中&#xff0c;我们有时候会因为工作或学习的原因&#xff0c;会使用到一些视频录制软件&#xff0c;通过视频录制软件&#xff0c;我们可以记录一些错过的细节&#xff0c;提高学习或工作效率。当然&#xff0c;现在网上的视频录制软件那么多&#xff0c;到底视频录制…

视频录制工具OBS选择区域录制

视频录制工具OBS录制时默认是对整个屏幕区域进行录制的&#xff0c;如果我们想只是录取某一屏幕区域&#xff0c;就需要进行一些修改。 相关的修改步骤如下&#xff1a; &#xff08;1&#xff09;新建一个“显示器采集”。 在弹出的对话框中&#xff0c;指定名字。 接着&…

基于Camera2和MediaRecorder实现视频录制

一、概述 视频录制&#xff0c;在一般开发中很少遇到&#xff0c;大部分开发工作都是写写页面&#xff0c;请求接口&#xff0c;展示数据等等。真要遇到&#xff0c;可能采用第三方库实现&#xff0c;一来实现快速&#xff0c;二来可能觉得别人实现的比较好。特别是在开发周期…