Windows系统安装Flink及实现MySQL之间数据同步

        Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink的设计目标是在所有常见的集群环境中运行,并以内存执行速度和任意规模来执行计算。它支持高吞吐、低延迟、高性能的流处理,并且是一个面向流处理和批处理的分布式计算框架,将批处理看作一种特殊的有界流。

Flink的主要特点包括:

  1. 事件驱动型:Flink是一个事件驱动型的应用,可以从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
  2. 支持有状态计算:Flink提供了Extactor-once语义及checkpoint机制,支持带有事件操作的流处理和窗口处理,以及灵活的窗口处理(如时间窗口、大小窗口等)。
  3. 轻量级容错处理:Flink使用savepoint进行错误恢复,可以在出现故障时快速恢复任务。
  4. 高吞吐、低延迟、高性能:Flink的设计目标是在保证数据处理稳定性的同时,实现高吞吐、低延迟、高性能的流处理。
  5. 支持大规模集群模式:Flink支持在yarn、Mesos、k8s等大规模集群环境中运行。
  6. 支持多种编程语言:Flink对java、scala、python都提供支持,但最适合使用java进行开发。

        Flink的应用场景非常广泛,可以用于实时流数据的分析计算、实时数据与维表数据关联计算、实时数仓建设、ETL(提取-转换-加载)多存储系统之间进行数据转化和迁移等场景。同时,Flink也适用于事件驱动型应用场景,如以kafka为代表的消息队列等。

1.Winows系统安装Flink

下载地址:Downloads | Apache Flink

选择 Apache Flink 1.16.0 - 2022-10-28 (Binaries)

下载 flink-1.16.0-bin-scala_2.12.tgz

使用CMD窗口,在Flink安装路径/bin目录下启动start-cluster.bat

访问http://localhost:8081,界面如下:

2.使用Flink实现MySQL数据库之间数据同步(JAVA)

<flink.version>1.16.0</flink.version>
<flink-cdc.version>2.3.0</flink-cdc.version>

1.创建Flink流处理运行环境。

2.设置流处理并发数。

3.设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步。

4.在Flink中创建中间同步数据库。

5.在Flink中创建中间表flink_source,来源于MySQL表source,(注意connector为mysql-cdc)。

6.在Flink中创建中间表flink_sink,来源于MySQL表sink。

7.将Flink中间表来源表数据写入flink_sink表,Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步。

package com.demo.flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkCdcMySql {public static void main(String[] args) {System.out.println("==========start run FlinkCdcMySql#main.");// 创建Flink流处理运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081);// 设置流处理并发数env.setParallelism(3);// 设置Flink存档间隔时间,单位为ms,当同步发生异常时会恢复最近的checkpoint继续同步env.enableCheckpointing(5000);final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 在Flink中创建中间同步数据库tEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_test");// 在Flink中创建中间表flink_source,来源于MySQL表source// 注意connector为mysql-cdctEnv.executeSql("CREATE TABLE flink_test.flink_source (\n" +"    id int,\n" +"    name varchar(255),\n" +"    create_time TIMESTAMP\n," + // Flink不支持datetime格式"    PRIMARY KEY (id) NOT ENFORCED" + //主键必须标明NOT ENFORCED") WITH (\n" +"  'connector'  = 'mysql-cdc',\n" +"  'hostname'   = '127.0.0.1',\n" +"  'database-name'   = 'flink-source',\n" +"  'table-name' = 'source',\n" +"  'username'   = 'root',\n" +"  'password'   = 'root'\n" +")");// 在Flink中创建中间表flink_sink,来源于MySQL表sinktEnv.executeSql("CREATE TABLE flink_test.flink_sink (\n" +"    id int,\n" +"    name varchar(255),\n" +"    create_time TIMESTAMP\n," +"    PRIMARY KEY (id) NOT ENFORCED" +") WITH (\n" +"  'connector'  = 'jdbc',\n" +"  'url'        = 'jdbc:mysql://127.0.0.1:3306/flink-sink',\n" +"  'table-name' = 'sink',\n" +"  'driver'     = 'com.mysql.jdbc.Driver',\n" +"  'username'   = 'root',\n" +"  'password'   = 'root'\n" +")");//        Table transactions = tEnv.from("flink_source");
//        transactions.executeInsert("flink_sink");System.out.println("==========begin Mysql data cdc.");// 将Flink中间表来源表数据写入flink_sink表// Flink会根据MySQL binlog中source表变化,动态更新flink_sink表,同时会将flink_sink表数据写入MySQL sink表,实现MySQL数据持续同步tEnv.executeSql("INSERT INTO flink_test.flink_sink(id, name, create_time)\n" +"select id, name, create_time\n" +"from flink_test.flink_source\n");System.out.println("==========continue Mysql data cdc.");}}

git代码地址:

flink-cdc-MySQL: FlinkCDC实现MySQL之间数据同步

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2778493.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

基于JavaWeb的网上订餐项目

点击以下链接获取源码&#xff1a; https://download.csdn.net/download/qq_64505944/88825723?spm1001.2014.3001.5503 Java项目-16 浏览商品&#xff0c;会员登录&#xff0c;添加购物车&#xff0c;进行配送等功能 文件代码功能介绍 1.Src下的java文件存放的我们后端的…

第三章 搜索与图论(三)(最小生成树,二分图)

一、最小生成树算法 稠密图使用prim算法&#xff0c;稀疏图使用kruskal算法 二、prim算法求最小生成树 prim和dijkstra算法类似&#xff0c;都是找到符合某种条件的点&#xff0c;然后更新。prim使用到已经构成的部分最小树所有结点中最小的距离。dijkstra算法是使用到起点最…

43.1k star, 免费开源的 markdown 编辑器

简介 项目名&#xff1a; MarkText-- 简单而优雅的开源 Markdown 编辑器 Github 开源地址&#xff1a; https://github.com/marktext/marktext 官网&#xff1a; https://www.marktext.cc/ 支持平台&#xff1a; Linux, macOS 以及 Windows。 操作界面&#xff1a; 在操作界…

七、滚动条操作——调整图像对比度

对比度调整&#xff1a;是在原来图像基础上进行相应的公式调整&#xff0c;是类似乘法操作&#xff0c;本身像数值越大&#xff0c;对比度增加之后其与低像素点值差距越大&#xff0c;导致对比增强 项目最终效果&#xff1a;通过滚动条trackbar来实现调整图片亮度的功能 我这里…

小游戏和GUI编程(5) | SVG图像格式简介

小游戏和GUI编程(5) | SVG图像格式简介 0. 问题 Q1: SVG 是什么的缩写&#xff1f;Q2: SVG 是一种图像格式吗&#xff1f;Q3: SVG 相对于其他图像格式的优点和缺点是什么&#xff1f;Q4: 哪些工具可以查看 SVG 图像&#xff1f;Q5: SVG 图像格式的规范是怎样的&#xff1f;Q6…

基于JSP的网上购书系统

点击以下链接获取源码&#xff1a; https://download.csdn.net/download/qq_64505944/88825694?spm1001.2014.3001.5503 Java项目-15 源码论文数据库配置文件 基于JSP的网上购书系统 摘要 在当今的社会中&#xff0c; 随着社会经济的快速发展以及计算机网络技术和通讯技术…

css2复合选择器

一.后代&#xff08;包含&#xff09;选择器&#xff08;一样的标签可以用class命名以分别&#xff09; 空格表示 全部后代 应用 二.子类选择器 >表示 只要子不要孙 应用 三.并集选择器 &#xff0c;表示 代表和 一般竖着写 应用 四.伪类选择器&#xff08;包括伪链接…

python WEB接口自动化测试之requests库详解

由于web接口自动化测试需要用到python的第三方库--requests库&#xff0c;运用requests库可以模拟发送http请求&#xff0c;再结合unittest测试框架&#xff0c;就能完成web接口自动化测试。 所以笔者今天先来总结一下requests库的用法。希望对大家&#xff08;尤其是新手&…

[C# WPF] DataGrid选中行或选中单元格的背景和字体颜色修改

问题描述 WPF中DataGrid的选中行或选中者单元格&#xff0c;在焦点失去后&#xff0c;颜色会很淡&#xff0c;很不明显&#xff0c;不容易区分。 解决方法 在失去焦点的情况下&#xff0c;如何设置行或单元格与选中的时候颜色一样&#xff1f; <DataGrid.Resources>&…

Postgresql 的编译安装与包管理安装, 全发行版 Linux 通用

博客原文 文章目录 实验环境信息编译安装获取安装包环境依赖编译安装安装 contrib 下工具代码 创建用户创建数据目录设置开机自启动启动数据库常用运维操作 apt 安装更新源安装 postgresql开机自启修改配置修改密码 实验环境信息 Ubuntu 20.04Postgre 16.1 编译安装 获取安装…

BUUCTF LKWA

1.访问页面。 2.选择 Variables variable 关卡 3.获得flag http://357dab81-78b8-4d74-976a-4a69dd894542.node5.buuoj.cn:81/variables/variable.php?funcpassthru&inputcat%2Fflagflag{0020ced6-8166-4fa5-87a7-7d93ee687c3e}

【Linux笔记】动静态库的封装和加载

一、静态库的封装 我们在学习C语言阶段其实就已经知道一个可执行程序的形成过程分为预处理、编译、汇编、链接这四个阶段&#xff0c;而且也知道我们程序中使用的各种库其实是在链接的阶段加载的。 可我们那时候并不知道库是怎么被加载的&#xff0c;或者库是怎么形成的&…

结构体的大小以及内存对齐问题

结构体的大小怎么计算&#xff1f;什么是结构体的对齐&#xff1f; 首先想要直到结构体的大小需要先了解结构体的内存对齐。那么&#xff0c;什么是结构体的内存对齐&#xff1a; 什么是结构体内存对齐 结构体的对齐 就是 结构体类型数据在内存中按照一定的对齐规律储存。结…

python+django高校活动报名场地管理系统l1ro4

校园活动管理平台程序的开发&#xff0c;在数据库的选择上面&#xff0c;选择功能强大的MySQL数据库进行数据的存放操作。 技术栈 后端&#xff1a;python 前端&#xff1a;vue.jselementui 框架&#xff1a;django Python版本&#xff1a;python3.7 数据库&#xff1a;mysql5…

实现自定义标记

实现自定义标记 问题陈述 New Tech Book的高级管理层决定在其用JSP设计的应用程序的所有页面上显示版权信息。它们还要去如何向应用程序中添加JSP页面,可以重用显示版本信息的代码。公司的软件开发人员Jerry Smith决定用自定义标记来创建应用程序的这一部分。 解决方案 要解…

kali最新最简单安装

之前都是用iso镜像文件的 今年好多东西都删库了&#xff0c;所有还是要主要资源的保存 去官网找下载 一般来说都是用虚拟机的 下载完会是一个压缩文件&#xff0c; 解压&#xff0c;然后操作之前需要先下载虚拟机 打开方式用虚拟机打开 kali就按装好了

腾讯云4核8G服务器可以用来干嘛?怎么收费?

腾讯云4核8G服务器适合做什么&#xff1f;搭建网站博客、企业官网、小程序、小游戏后端服务器、电商应用、云盘和图床等均可以&#xff0c;腾讯云4核8G服务器可以选择轻量应用服务器4核8G12M或云服务器CVM&#xff0c;轻量服务器和标准型CVM服务器性能是差不多的&#xff0c;轻…

Minecraft的红石教程之电梯

一.前言 我记得是上初中的时候&#xff0c;就看到了这类电梯。现在我在看现在这类电梯的相关视频&#xff0c;大多是盗用创意未能领会其中的红石运作规律&#xff0c;于是我就删繁就简写了这篇。 二.步骤 1.材料 粘性活塞&#xff0c;黏液块&#xff0c;红石&#xff0c;红…

考研数据结构笔记(6)

单链表的建立 单链表的建立尾插法头插法 双链表初始化插入删除遍历小结 单链表的建立 尾插法 首先对单链表进行定义&#xff0c;然后初始化 法1&#xff1a;定义遍历链表的插入函数 法2&#xff1a;利用指针移动建立函数 头插法 带头结点 双链表 初始化 插入 p节点不是最后…

【Linux】学习-基础IO—下

Linux基础IO—上 重定向 通过上篇的学习&#xff0c;我们了解了文件描述符的分配规则是遍历指针数组&#xff0c;用没有被使用的最小下标作为新的文件描述符&#xff0c;也就是我们可以通过关闭三个标准流文件并使用他们原先所占用的0&#xff0c;1&#xff0c;2描述符。 那…