Flink CDC 提取记录变更时间作为事件时间和 Hudi 表的 precombine.field 以及1970-01-01 取值问题

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

CDC 数据中的记录变更时间标记着这条记录在数据库中执行对应操作(创建/更新/删除)的时间,可以说是天然的“事件时间”,特别是对于那些本身没有记录时间字段的表来说就更加合适了。Flink 官方文档 也建议在使用 CDC 的情况下,优先使用 CDC 中的这个时间字段,这个时间更加精准。

与此同时,在定义 Hudi 表时,precombine.field 也是一个非常重要的配置,显然 CDC 数据中的记录变更时间是最合适的,没有之一。

CDC 数据中的记录变更时间属于元数据范畴,以 Flink CDC 的 MySQL 数据库为例,它提供四种元数据的抽取:

KeyDataTypeDescription
table_nameSTRING NOT NULLName of the table that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0.
row_kindSTRING NOT NULLIt indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if the source operator chooses to output the ‘row_kind’ column for each record. It is recommended to use this metadata column only in simple synchronization jobs. ‘+I’ means INSERT message, ‘-D’ means DELETE message, ‘-U’ means UPDATE_BEFORE message and ‘+U’ means UPDATE_AFTER message.

其中的 op_ts 就是我们想要的,也就是:CDC 数据中的记录变更时间。我们可以在定义数据表时声明这个列,Flink CDC 可以将其提取出来作为普通字段供下游使用,就像下表中这样:

CREATE TABLE IF NOT EXISTS orders_mysql_cdc (`order_number` INT NOT NULL,`order_date` DATE NOT NULL,`purchaser` INT NOT NULL,`quantity` INT NOT NULL,`product_id` INT NOT NULL,`op_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc',...
);

注意,在定义 Flink CDC 源表时,op_ts 的数据类型是 TIMESTAMP_LTZ(3),不是 TIMESTAMP(3),写入下游表时,可以是 TIMESTAMP(3)

当我们初次使用这个 op_ts 字段时,你会发现,写入到的数据库的数据全部都是 1970-01-01 00:00:00.000,就像下面这样:

在这里插入图片描述

你可能会认为是哪里出错了,实际上,这是 Flink CDC 特别设计的,也是合理的,Flink CDC 官方文档的解释是:

If the record is read from snapshot of the table instead of the binlog, the value is always 0.

我们知道,Flink CDC ( 2.0+ ) 的一个显著特征是:它是全量 + 增量的一体化读取!全量就是经常说的历史数据,增量就是实时的数据,控制 Flink CDC 是从全部历史数据开始同步整个数据库还是从只当下的 binlog 中同步近期增量数据的配置项是:scan.startup.mode ( 官方文档 ),该配置项支持 5 种配置,而默认配置(initial)就是以当前分界点,数据中的现有数据使用全量方式读取(也叫快照读取),此后的数据从 binlog 中读取,这样就和上面描述的 op_ts 字段的取值吻合上了:

当 Flink CDC 使用全量方式读取表中的历史数据时,op_ts 字段全部取值为 0,即 1970-01-01 00:00:00.000,当 Flink CDC 使用增量方式读取 binlog 数据时,op_ts 字段的取值为数据发生变更的实际时间

这种设计还是非常合理的,因为,Flink CDC 本身在使用快照方式读取时,就没有任何变更时间可以读取,这个时间只在 binlog 中才有,而这对下游也不会造成太大的影响,因为此时的数据都是 insert-only 的数据,同一主键也不会出现两条记录,至少对 Hudi 表是没有影响的。

此外,作为一个“额外收获”,你会发现:op_ts 这个字段本身恰好标记了一条记录是通过全量同步进来的,还是增量同步进来的!


补充:以下是 Flink CDC 官方文档对 scan.startup.mode 5 种同步模式的解释:

The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer. The valid enumerations are:

  • initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
  • earliest-offset: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.
  • latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
  • specific-offset: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.
  • timestamp: Skip snapshot phase and start reading binlog events from a specific timestamp.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2813978.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

C++:list容器(非原生指针迭代器的实现)

本章是STL容器 list 的模拟实现。 之前已经使用 C语言 对带头双向循环链表 进行实现,详见数据结构: 线性表(带头双向循环链表实现), 相较于之前的实现,C 下多了对迭代器以及模板等相关语法特性。下面将着重讲解这些新知识。 文章目录 一. list 的基本框架…

数据结构学习——跳表

假设我们有一个有序链表A,其中元素为1,3,4,5,7,8,9,10,13,16,17,18 我们在找寻其中的元素的时候,需要我们从头开始向下找寻。因此时间复杂度为O(n)。为了减少时间复杂度,我们提出了跳表的概念 原始链表 跳表 可以看到,我们实际…

HTML-基础标签

1. HTML初识 1.1 什么是HTML HTML(英文Hyper Text Markup Language的缩写)中文译为“超文本标签语言”,是用来描述网页的一种语言。所谓超文本,因为它可以加入图片、声音、动画、多媒体等内容,不仅如此,它还…

[Mac软件]Adobe Substance 3D Stager 2.1.4 3D场景搭建工具

应用介绍 Adobe Substance 3D Stager,您设备齐全的虚拟工作室。在这个直观的舞台工具中构建和组装 3D 场景。设置资产、材质、灯光和相机。导出和共享媒体,从图像到 Web 和 AR 体验。 处理您的最终图像 Substance 3D Stager 可让您在上下文中做出创造性…

论文阅读:SOLOv2: Dynamic, Faster and Stronger

目录 概要 Motivation 整体架构流程 技术细节 小结 论文地址:[2003.10152] SOLOv2: Dynamic and Fast Instance Segmentation (arxiv.org) 代码地址:GitHub - WXinlong/SOLO: SOLO and SOLOv2 for instance segmentation, ECCV 2020 & NeurIPS…

【OpenCV C++】Mat img.total() 和img.cols * img.rows 意思一样吗?二者完全相等吗?

文章目录 1 结论及区别2 Mat img的属性 介绍1 结论及区别 在大多数情况下,img.total() 和 img.cols * img.rows 是相等的,但并不总是完全相等的。下面是它们的含义和一些区别: 1.img.total() 表示图像中像素的总数,即图像的总像素数量。2.img.cols * img.rows 也表示图像中…

【机器人最短路径规划问题(栅格地图)】基于遗传算法求解

基于遗传算法求解机器人最短路径规划问题(栅格地图)的仿真结果 仿真结果: 路径长度的变化曲线: 遗传算法优化后的机器人避障路径:

ky10-server docker 离线安装包、离线安装

离线安装脚本 # ---------------离线安装docker------------------- rpm -Uvh --force --nodeps *.rpm# 修改docker拉取源为国内 rm -rf /etc/docker mkdir -p /etc/docker touch /etc/docker/daemon.json cat >/etc/docker/daemon.json<<EOF{"registry-mirro…

yolov9 瑞芯微芯片rknn部署、地平线芯片Horizon部署、TensorRT部署

特别说明&#xff1a;参考官方开源的yolov9代码、瑞芯微官方文档、地平线的官方文档&#xff0c;如有侵权告知删&#xff0c;谢谢。 模型和完整仿真测试代码&#xff0c;放在github上参考链接 模型和代码。 之前写过yolov8检测、分割、关键点模型的部署的多篇博文&#xff0c;y…

flutter 加密安全

前言&#xff1a;数据安全 数据的加密解密操作在 日常网络交互中经常会用到&#xff0c;现在密码的安全主要在于 秘钥的安全&#xff0c;如论 DES 3DES AES 还是 RSA, 秘钥的算法&#xff08;计算秘钥不固定&#xff09; 和 保存&#xff0c;都决定了你的数据安全&#xff1b;…

电子电器架构新趋势 —— 最佳着力点:域控制器

电子电器架构新趋势 —— 最佳着力点&#xff1a;域控制器 我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师&#xff08;Wechat&#xff1a;gongkenan2013&#xff09;。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师…

wpf 数据绑定 数据转换

1.概要 数据绑定&#xff0c;有时候绑定的数据源和目标的数据类型不同&#xff0c;这时候就需要转换。 2.代码 2.1 xaml(eXtensible Application Markup Language) 可扩展应用程序标记语言 <Window x:Class"WpfApp6.MainWindow"xmlns"http://schemas.mi…

小白水平理解面试经典题目LeetCode 655. Print Binary Tree【Tree】

655 打印二叉树 一、小白翻译 给定二叉树的 root &#xff0c;构造一个 0 索引的 m x n 字符串矩阵 res 来表示树的格式化布局。格式化布局矩阵应使用以下规则构建&#xff1a; 树的高度为 height &#xff0c;行数 m 应等于 height 1 。 列数 n 应等于​​xheight1​​ - …

ROS-Ubuntu 版本相关

ROS-Ubuntu 版本相关&#xff1a;安装指引 年代ROS1版本Ubuntu 版本2014Indigo14.042016Kinetic16.042018Melodic18.042020Noetic20.04 & 22.04 ROS2兼顾了工业使用上的问题。 年代ROS2版本Ubuntu 版本2022Humble20.04 & 22.042023Iron16.04 相关参考&#xff1a; […

Linux/Spectra

Enumeration nmap 第一次扫描发现系统对外开放了22&#xff0c;80和3306端口&#xff0c;端口详细信息如下 22端口运行着ssh&#xff0c;80端口还是http&#xff0c;不过不同的是打开了mysql的3306端口 TCP/80 进入首页&#xff0c;点击链接时&#xff0c;提示域名不能解析&…

【深度学习目标检测】二十一、基于深度学习的葡萄检测系统-含数据集、GUI和源码(python,yolov8)

葡萄检测在农业中具有多方面的意义&#xff0c;具体来说如下&#xff1a; 首先&#xff0c;葡萄检测有助于保障农产品质量安全。通过对葡萄进行质量安全专项监测&#xff0c;可以确保葡萄中的农药残留、重金属等有害物质含量符合标准&#xff0c;从而保障消费者的健康。同时&am…

Ubuntu Mysql Innodb cluster集群搭建+MaxScale负载均衡(读写分离)

Ubuntu系统版本 20.04.3 LTS (Focal Fossa) 、64位系统。 cat /etc/os-release查看Ubuntu系统是32位还是64位 uname -m如果显示“i686”,则表示安装了32位操作系统。如果显示“x86_64”,则表示安装了64位操作系统。 一、安装MySql 参考: https://blog.csdn.net/qq_3712…

Idea安装gideabrowser插件

Idea安装gideabrowser插件 一、安装二、设置教程 一、安装 gideabrowser链接地址 二、设置教程 在人生的舞台上&#xff0c;奋力拼搏&#xff0c;才能演绎出最精彩的人生之歌。面对挑战和困难&#xff0c;不妥协、不气馁&#xff0c;只争朝夕&#xff0c;方显坚韧与智慧。努…

10.题号:编号3227 找到最多的数

题目&#xff1a; ###本题考察map和枚举 #include<bits/stdc.h> using namespace std; map<int,int> mp; int main(){int n,m;cin>>n>>m;for(int i1;i<m*n;i){int x;cin>>x;mp[x];}for(const auto & [x,y] : mp){if(2*y>n*m/2){cout…

【MATLAB】 LMD信号分解+FFT傅里叶频谱变换组合算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~ 展示出图效果 1 LMD分解算法 LMD (Local Mean Decomposition) 分解算法是一种信号分解算法&#xff0c;它可以将一个信号分解成多个局部平滑的成分&#xff0c;并且可以将高频噪声和低频信号有效地分离出来。LMD 分解算…