flinkcdc 3.0 源码学习之任务提交脚本flink-cdc.sh

大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读.
在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务,
然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交,
本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命令+yaml文件将任务提交,其他的功能会在之后的文章中解读
大数据小菜鸡在努力学习中,文中内容有误多多指点.

目录

概述
流程图
flink-cdc.sh解读
完整代码
逐行解读
参考

概述

首先需要思考一下,如果是自己来实现这一效果,那么应该如何设计,用什么技术?

我们知道flinkcdc的同步任务实际上也是一个flink任务,最终的提交的还是一个flink任务,而flink任务实际上就是个java任务,用jps命令都是可以查到的.

我们在编写flink streaming程序的时候,实际上主要的流程都是在一个main方法中,而main方法是可以接收参数的,所以这块设计起来其实很简单就是在shell脚本中获取到FLINK_HOME路径,然后将yaml文件通过命令行的方式传递到main方法中,然后再设计一个类来解析这个yaml文件形成一个任务实体类,然后根据这个实体类来生成一个flink任务,这就是一个大概的思路,里面肯定还有很多的细节,接下来就通过这个flink-cdc.sh脚本的解读来进一步看看大佬们是如何来实现这一功能的.

流程图

这里使用一个流程图来描述整个的流程,看完这个就知道这一脚本的大概内容了,如果有兴趣可以继续往下阅读,后面都是将脚本的一行一行的解读并配有中文注释.
image.png

flink-cdc.sh解读

源码路径 : flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh

完整代码

#!/usr/bin/env bash
################################################################################
#  Copyright 2023 Ververica Inc.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################# Setup FLINK_HOME
args=("$@")
# Check if FLINK_HOME is set in command-line arguments by "--flink-home"
for ((i=0; i < ${#args[@]}; i++)); docase "${args[i]}" in--flink-home)if [[ -n "${args[i+1]}" ]]; thenFLINK_HOME="${args[i+1]}"breakfi;;esac
done
if [[ -z $FLINK_HOME ]]; thenecho "[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."exit 1
fi# Setup Flink related configurations
# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
_FLINK_HOME_DETERMINED=1
# FLINK_CONF_DIR is required by config.sh
FLINK_CONF_DIR=$FLINK_HOME/conf
# Use config.sh to setup Flink related configurations
. $FLINK_HOME/bin/config.sh# Define Flink CDC directories
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
FLINK_CDC_HOME="$SCRIPT_DIR"/..
export FLINK_CDC_HOME=$FLINK_CDC_HOME
FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib
FLINK_CDC_LOG="$FLINK_CDC_HOME"/log# Build Java classpath
CLASSPATH=""
# Add Flink libraries to the classpath
for jar in "$FLINK_HOME"/lib/*.jar; doCLASSPATH=$CLASSPATH:$jar
done
# Add Flink CDC libraries to classpath
for jar in "$FLINK_CDC_LIB"/*.jar; doCLASSPATH=$CLASSPATH:$jar
done
# Add Hadoop classpath, which is defined in config.sh
CLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
# Trim classpath
CLASSPATH=${CLASSPATH#:}# Setup logging
LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
LOG_SETTINGS=(-Dlog.file="$LOG" -Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)# JAVA_RUN should have been setup in config.sh
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"

逐行解析

参数传入

#!/usr/bin/env bash
# Setup FLINK_HOME
# 获取这个脚本的所有参数,然后存储到args变量中
# ${#args[@]} 获取数组长度
# ${args[i]} 获取数组第i个值
args=("$@")

设置FLINK_HOME这个变量

# Check if FLINK_HOME is set in command-line arguments by "--flink-home"
# 遍历传入的参数检查是否FLINK_HOME这个环境变量是通过命令行参数 --flink-home传递进来的
# shell中case的语法
# case 值 in
#      模式1) # 这里的模式指的是shell中的通配符模式不是正则表达式,例如 a*,就是a开头的任意字符串
#           代码块
#           ;;
#      模式2)
#           代码块
#           ;;
#      *)
#           默认代码块
#           ;;
# esac
for ((i=0; i < ${#args[@]}; i++)); docase "${args[i]}" in--flink-home)# 如果匹配到到了就取他的下一个值给FLINK_HOME赋值,取值之前要判断一下是否存在# -n 就是检查字符串长度是否大于0,大于0返回true,否则falseif [[ -n "${args[i+1]}" ]]; thenFLINK_HOME="${args[i+1]}"breakfi;;esac
done

校验FLINK_HOME这个变量是否设置成功

# 如果经过上面的循环还是没有给FLINK_HOME赋值就退出程序
# 提示 [错误] 不能够在命令行参数--flink-home 或者 环境变量FLINK_HOME 找到 FLINK_HOME的值 
if [[ -z $FLINK_HOME ]]; thenecho "[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."exit 1
fi

获取Flink的一些相关配置

# Setup Flink related configurations
# 设置flink相关的配置
# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
# 为了避免config.sh(这个文件在$FLINK_HOME/bin/config.sh)覆盖掉FLINK_HOME这个变量,所以这里将它置位1
# 为什么置为1呢,这里可以看一下config.sh中的相关代码,如下
# 可以看到如果变量_FLINK_HOME_DETERMINED为空那么就会把FLINK_HOME的值替换掉,所以这里将它的值赋值为1就是为了避免这个
# 具体FLINK_HOME会被替换成什么值呢
# dirname 就是要获取文件路径的路径,例如dirname /home/user/a.txt 返回 /home/user/
# $SYMLINK_RESOLVED_BIN 是什么值呢
# 是切换到$bin路径下,的绝对路径(pwd -P的意思就是获取实际文件系统路径,pwd是获取链接路径)
# $bin是target的路径
# target="$0" # $0就是当前脚本的名称
# -L 判断是否是一个链接符号,判断target是否是一个链接符号
# 如果是一个链接符号,那么就执行循环的代码块
# 跳出的条件是target变量不是一个链接符号或者循环了100次跳出循环,-gt是大于 -ge是大于等于
# ls 就是列出目录信息
# -ld 有两个参数 -l和-d,-l是长格式进行显示,包括文件的属性和权限信息,相当于ll
# -d是只显示目录自身的信息,而不列出目录中的文件,无论是文件还是目录,都不会进入它,仅是显示它自身的信息
# -- 是一个特殊的选项,	用于分隔选项与参数.它的作用是确保$target被视作参数,即使$target是 - 开头的,避免将其解析成选项
# 解释一下 target=`expr "$ls" : '.* -> \(.*\)$'`
# 这行大概意思就是通过expr命令和正则表达式提取$ls变量中符号链接的目标路径或者目录,然后赋值给target
# expr 是一个执行表达式的命令
# "$ls" 是作为参数传递给expr
# : '.* -> \(.*\)$' 这是一个正则表达式,用于匹配符号链接中的目标文件或目录.通过使用圆括号 ( ) 捕获模式,可以将匹配到的部分提取出来# target="$0"
# # For the case, the executable has been directly symlinked, figure out
# # the correct bin path by following its symlink up to an upper bound.
# # Note: we can't use the readlink utility here if we want to be POSIX
# # compatible.
# iteration=0
# while [ -L "$target" ]; do
#     if [ "$iteration" -gt 100 ]; then
#         echo "Cannot resolve path: You have a cyclic symlink in $target."
#         break
#     fi
#     ls=`ls -ld -- "$target"`
#     target=`expr "$ls" : '.* -> \(.*\)$'`
#     iteration=$((iteration + 1))
# done
# Convert relative path to absolute path and resolve directory symlinks
# bin=`dirname "$target"`
# SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
# if [ -z "$_FLINK_HOME_DETERMINED" ]; then
#     FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
# fi_FLINK_HOME_DETERMINED=1
# FLINK_CONF_DIR is required by config.sh
# config.sh 需要 FLINK_CONF_DIR 配置
FLINK_CONF_DIR=$FLINK_HOME/conf
# Use config.sh to setup Flink related configurations
# 使用config.sh来配置 Flink相关的配置
. $FLINK_HOME/bin/config.sh

定义Flink cdc 的一些路径

# Define Flink CDC directories
# 定义Flink cdc 的路径
# SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
# 这行的大概意思就是要获取脚本的绝对路径
# ${BASH_SOURCE[0]} bash的特殊变量,获取当前运行脚本的名称
# $(dirname -- ${BASH_SOURCE[0]}) 获取当前运行脚本的路径(不能直接用这个,因为可能会因为软连接或者其他情况导致路径获取不准确,最稳妥的方法就是cd 到这个路径然后pwd获取绝对路径),这里的 -- 就是防止后面的变量被识别成选项例如-开头
# cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" 切换到这个路径下
# &> /dev/null 就是将一些标准输出和错误输出都重定向到/dev/null,这样可以使输出更清晰
# && 当前一个命令执行成功后执行后面的命令
# pwd 获取当前路径# FLINK_CDC_HOME="$SCRIPT_DIR"/..
# SCRIPT_DIR 的上级路径就是FLINK_CDC_HOME的值,就是切换到了bin目录的根目录
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
FLINK_CDC_HOME="$SCRIPT_DIR"/..
export FLINK_CDC_HOME=$FLINK_CDC_HOME
FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib
FLINK_CDC_LOG="$FLINK_CDC_HOME"/log

构建任务启动需要的classpath

# Build Java classpath
# 构建 Java的calsspath
CLASSPATH=""
# Add Flink libraries to the classpath
# 将flink路径下lib的jar包都添加到classpath中
for jar in "$FLINK_HOME"/lib/*.jar; doCLASSPATH=$CLASSPATH:$jar
done
# Add Flink CDC libraries to classpath
# 将cdc下lib的jar包都添加到classpath
for jar in "$FLINK_CDC_LIB"/*.jar; doCLASSPATH=$CLASSPATH:$jar
done
# Add Hadoop classpath, which is defined in config.sh
# 添加hadoop 的classpath
CLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
# Trim classpath
# 去掉字符串开头的冒号 ,如果要去掉结尾的冒号 ${CLASSPATH%:}
CLASSPATH=${CLASSPATH#:}

设置日志相关的配置

# Setup logging
# 配置日志
LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
# 启动命令中将日志的配置参数拼接,指定日志文件以及日志配置文件
LOG_SETTINGS=(-Dlog.file="$LOG" -Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)

启动任务

# JAVA_RUN should have been setup in config.sh
# exec 是一个用于替换当前进程的命令,一般用在脚本中,会将当前脚本的执行进程执行的内容替换成exec后面命令
# 有什么作用呢?
# 1.减少系统资源 : 不用创建一个新的进程
# 2.重定向标准输入/输出 : 通过使用 exec 命令执行新的命令.可以将标准输入,输出和错误重定向到新命令所指定的位置.
# 3.执行后续操作:在脚本中,使用 exec 命令可以执行一些命令或操作后,将控制权交给新的命令.这可以用于在脚本中完成某些初始化操作后,将脚本完全替换为另一个命令或程序.
# $JAVA_RUN 在config.sh就定义了,一般是java 或者 /bin/java
# -classpath 指定classpath路径
# "${LOG_SETTINGS[@]}" 日志的一些配置信息
# com.ververica.cdc.cli.CliFrontend 入口类
# "$@" 所有的命令行参数传到入口类中,通过String args[] 来接收
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"

参考

[1] : https://github.com/apache/flink

[2] : https://github.com/ververica/flink-cdc-connectors

[3] : https://blog.csdn.net/wang2leee/article/details/132521566

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2779126.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

116.乐理基础-五线谱-音值组合法(一)

内容参考于&#xff1a;三分钟音乐社 上一个内容&#xff1a;115.乐理基础-五线谱-五线谱的练习方式-CSDN博客 音值组合法有点鸡肋&#xff0c;有两个原因 1. 它的本质只是为了让乐谱里的音符&#xff0c;在外观上组合得更加方便我们去看谱&#xff0c;并不涉及到什么很重要很…

开源版发卡小程序源码,云盘发卡微信小程序源码带PC端

一款发卡小程序。带PC端 系统微信小程序前端采用nuiapp 后端采用think PHP6 PC前端采用vue开发 使用HBuilderX工具打开&#xff0c;运行到微信小程序工具&#xff0c;系统会自动打包微信小程序代码 修改文件common/request/request.js 改成你的后端网址 微信小程序端完全…

猫头虎分享已解决Bug || Docker Error: Request Canceled While Waiting for Connection

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

Java图形化界面编程——弹球游戏 笔记

Java也可用于开发一些动画。所谓动画&#xff0c;就是间隔一定的时间(通常小于0 . 1秒 )重新绘制新的图像&#xff0c;两次绘制的图像之间差异较小&#xff0c;肉眼看起来就成了所谓的动画 。 ​ 为了实现间隔一定的时间就重新调用组件的 repaint()方法&#xff0c;可以借助于…

怎么用postman调用webservice(反推SoapUI)

<soapenv:Envelope xmlns:soapenv“http://schemas.xmlsoap.org/soap/envelope/” xmlns:lis“LisDataTrasen”> soapenv:Header/ soapenv:Body lis:Test lis:test111111111</lis:test> </lis:Test> </soapenv:Body> </soapenv:Envelope> Conten…

Netty应用(四) 之 Reactor模型 零拷贝

目录 6.Reactor模型 6.1 单线程Reactor 6.2 主从多线程Reactor (主--->Boss | 从--->Worker | 一主多从机制) 7.扩展与补充 8.Reactor模型的实现 8.1 多线程Reactor模型的实现&#xff08;一个Boss线程&#xff0c;一个Worker线程&#xff09; 8.2 多线程Reactor模…

智能时代:创新创业的新机遇与挑战

智能时代为创新创业提供了广阔的空间和无限的可能性。以下是一些可能适合智能时代背景的创新创业方向&#xff1a; 人工智能技术应用&#xff1a;人工智能技术是当前科技领域最热门的技术之一&#xff0c;其应用范围不断扩大。创业者可以将人工智能技术应用于各个领域&#xf…

vue-生命周期+工程化开发(三)

生命周期 Vue 生命周期 和 生命周期的四个阶段 思考&#xff1a; 什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09;什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a;一个Vue实例从 创建…

OnlyOffice-8.0版本深度测评

OnlyOffice 是一套全面的开源办公协作软件&#xff0c;不断演进的 OnlyOffice 8.0 版本为用户带来了一系列引人瞩目的新特性和功能改进。OnlyOffice 8.0 版本在功能丰富性、安全性和用户友好性上都有显著提升&#xff0c;为用户提供了更为强大、便捷和安全的文档处理和协作环境…

C#,最大公共子序列(LCS,Longest Common Subsequences)的算法与源代码

1 最大公共子序列 最长的常见子序列问题是寻找两个给定字符串中存在的最长序列。 最大公共子序列算法&#xff0c;常用于犯罪鉴定、亲子鉴定等等的 DNA 比对。 1.1 子序列 让我们考虑一个序列S<s1&#xff0c;s2&#xff0c;s3&#xff0c;s4&#xff0c;…&#xff0c;…

node.js+vue企业人事自动化办公oa系统c288a

采用B/S模式架构系统&#xff0c;开发简单&#xff0c;只需要连接网络即可登录本系统&#xff0c;不需要安装任何客户端。开发工具采用VSCode&#xff0c;前端采用VueElementUI&#xff0c;后端采用Node.js&#xff0c;数据库采用MySQL。 涉及的技术栈 1&#xff09; 前台页面…

STM32单片机的基本原理与应用(七)

超声波测距实验 基本原理 超声波测距实验是STM32单片机通过控制HC-SR04超声波模块&#xff0c;使其发送超声波&#xff0c;遇到物体反射回超声波来实现距离测量&#xff0c;其原理就是在发射超声波到接收超声波会有一段时间&#xff0c;而超声波在空气中传播的速度为声速&…

uv机器电机方向极性

爱普生主板设置X、Y 电机方向极性&#xff1a;请根据实际情况设置&#xff0c;开机初始化时如果电机运动方向反了则修改此极性。 理光主板设置X、Y 电机方向极性

《乱弹篇(十三)明朝事儿》

2024年农历除夕夜&#xff0c;因追剧收看电视连续剧《后宫》而放弃了收看一年一度的《春晚》&#xff0c;至到春节&#xff08;农历正月初一&#xff09;晚才看完了《后宫》。 社交网站“必应”图片《后宫》 电视连续剧《后宫》&#xff0c; 讲的是明朝英宗末年的历史故事&…

渗透专用虚拟机(公开版)

0x01 工具介绍 okfafu渗透虚拟机公开版。解压密码&#xff1a;Mrl64Miku&#xff0c;压缩包大小&#xff1a;15.5G&#xff0c;解压后大小&#xff1a;16.5G。安装的软件已分类并在桌面中体现&#xff0c;也可以使用everything进行查找。包含一些常用的渗透工具以及一些基本工…

2.10日学习打卡----初学RocketMQ(一)

2.10日学习打卡 对于MQ(Message queue)消息队列的一些解释可以看我原来写的文章 初学RabbitMQ 各大MQ产品比较 一.RocketMQ概述 发展历程 RocketMQ概念术语 生产者和消费者 生产者负责生产消息&#xff0c;一般由业务系统负责生产消息&#xff0c;消费者即后台系统&…

【计算机网络】进程通信

进程 process 客户和服务器进程 下载文件表示为客户 &#xff0c;上载文件的对等方表示为服务器进程与计算机网络之间的接口 套接字 socket 应用层与传输层之间的接口是建立网络应用程序的可编程接口 API进程寻址 为了标识接收进程 需要两种信息 主机的地址目的主机中的接收进程…

2024牛客寒假算法基础集训营2

C Tokitsukaze and Min-Max XOR 题目大意 给定一个数组从任取数构成序列序列满足&#xff0c;&#xff08;可以只取一个数&#xff09;问能构造出多少个 解题思路 定找双枚举时间复杂度到&#xff0c;考虑利用加速统计的方案&#xff0c;即将数字按二进制位拆分挂在树上对于…

任务管理软件的实用价值及优选推荐:提升工作效率的利器

任务管理软件是一种用于组织任务、将任务分配给个人并监控其进展的软件。该软件可以帮助确保任务在预算内按时完成。它在协同工作环境中特别有用&#xff0c;在这种环境中多人在处理需要跟踪和监视的任务。无论是初创公司、中小型企业还是大型组织&#xff0c;都可以从任务管理…

猫头虎分享已解决Bug || Error from Server (Timeout) in Kubernetes Pods

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …