Flink学习记录

可以快速搭建一个Flink编写程序

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink \-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.17.1 \-DgroupId=com.zxx.langhuan \-DartifactId=langhuan-flink \-Dversion=1.0.0-SNAPSHOT \-Dpackage=com.zxx.langhuan.flink \-DinteractiveMode=false

Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs。

对于 Java,Flink 自带有 Tuple0 到 Tuple25 类型。

POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。

基本的 stream source

// DataStream API
// fromElements
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));// fromCollection
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);// socketTextStream
DataStream<String> lines = env.socketTextStream("localhost", 9999)// readTextFile
DataStream<String> lines = env.readTextFile("file:///path");// customSource
DataStreamSource<OUT> out = env.addSource(SourceFunction<OUT> function);// Table API
// 创建TableEnvironment(表环境)。 创建表环境时,你可以设置作业属性,定义应用的批流模式,以及创建数据源。 我们先创建一个标准的表环境,并选择流式执行器。
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
// 批处理模式
// EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tEnv = TableEnvironment.create(settings);tEnv.executeSql("CREATE TABLE transactions (\n" +"    account_id  BIGINT,\n" +"    amount      BIGINT,\n" +"    transaction_time TIMESTAMP(3),\n" +"    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +") WITH (\n" +"    'connector' = 'kafka',\n" +"    'topic'     = 'transactions',\n" +"    'properties.bootstrap.servers' = 'kafka:9092',\n" +"    'format'    = 'csv'\n" +")");tEnv.executeSql("CREATE TABLE spend_report (\n" +"    account_id BIGINT,\n" +"    log_ts     TIMESTAMP(3),\n" +"    amount     BIGINT\n," +"    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +") WITH (\n" +"    'connector'  = 'jdbc',\n" +"    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +"    'table-name' = 'spend_report',\n" +"    'driver'     = 'com.mysql.jdbc.Driver',\n" +"    'username'   = 'sql-demo',\n" +"    'password'   = 'demo-sql'\n" +")");Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");

支持的connectors

DataStream ConnectorsTable API Connectors
DataGen
Kafka
Cassandra
DynamoDB
ElasticSearch
Firehose
Kinesis
MongoDB
OpenSearch
文件系统
RabbitMQ
Google Cloud PubSub
Hybrid Source
Pulsar
JDBC
Upsert Kafka
HBase
Print
BlackHole
Hive

场景说明

一个 Flink 集群总是包含一个 JobManager 以及一个或多个 Flink TaskManager。
JobManager 负责处理 Job 提交、 Job 监控以及资源管理。
Flink TaskManager 运行 worker 进程, 负责实际任务 Tasks 的执行,而这些任务共同组成了一个 Flink Job。 
 

有界流:批处理

无界流:流处理

无状态的转换

map()

flatmap()

keyBy()

以下情况,一个类不能作为 key

  1. 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于 Object.hashCode() 实现。
  2. 它是任意类的数组。

reduce() 和其他聚合算子  

数据流转换

有状态的转换

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:

  • 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
  • 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
  • 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
  • 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
  • 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。

Rich Functions

  • open(Configuration c):仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。
  • close()
  • getRuntimeContext():为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。
  • setRuntimeContext()
  • getIterationRuntimeContext()

ValueState:要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。(相同的key共享)

  • value()
  • update()
  • clear()

Flink 明确支持以下三种时间语义:

  • 事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间

  • 摄取时间(ingestion time): Flink 读取事件时记录的时间

  • 处理时间(processing time): Flink pipeline 中具体算子处理事件的时间

Watermarks: 定义何时停止等待较早的事件

//        AssignerWithPunctuatedWatermarks 在每个事件上都可以提供水位线,因此水位线可以更加灵活和精确地根据事件的特性进行生成。
//        AssignerWithPeriodicWatermarks 在一定的时间间隔内提供水位线,因此水位线的生成不会频繁,适用于事件的时间戳变化频率较高,而水位线的变化频率较低的情况。
//        在选择使用哪个接口时,可以根据具体的业务需求和事件流的特点来决定。如果事件的时间戳和水位线的变化较为频繁,或者需要更精确的控制,可以选择 AssignerWithPunctuatedWatermarks。如果事件的时间戳变化较为平稳,或者水位线的变化不需要那么频繁,可以选择 AssignerWithPeriodicWatermarks。// WatermarkStrategy 接口包含以下方法:
//        withTimestampAssigner:指定如何从事件中提取事件时间戳(timestamp)。
//        withIdleness:配置是否在流处于空闲状态时发出水位线,默认为不发出水位线。
//        withIdlenessTimeout:指定流处于空闲状态多久后发出水位线。
//        withTimestampAssignerAndIdlenessTimeout:同时指定事件时间戳提取逻辑和流空闲状态的配置。// 通常,你可以通过静态工厂方法 WatermarkStrategy.forXXX() 来创建特定的水位线策略,其中 XXX 可以是以下几种类型之一:
//        forBoundedOutOfOrderness:适用于乱序但有界的事件流,根据最大允许的乱序程度指定固定的延迟时间。
//        forMonotonousTimestamps:适用于单调递增的事件流,例如源自某些消息队列的事件流。
//        forGenerator:通过自定义的水位线生成器函数来创建水位线策略。
//        forPeriodicBoundedOutOfOrderness:适用于周期性有界乱序事件流,指定固定的延迟时间和乱序间隔。
//        forEventTime:适用于事件时间已经正确嵌入在事件中的情况,不需要提取时间戳。DataStream<Event> stream = ...;WatermarkStrategy<Event> strategy = WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.timestamp);DataStream<Event> withTimestampsAndWatermarks =stream.assignTimestampsAndWatermarks(strategy);

Windows

Flink 有一些内置的窗口分配器,如下所示:

  • 滚动时间窗口
    • 每分钟页面浏览量
    • TumblingEventTimeWindows.of(Time.minutes(1))
  • 滑动时间窗口
    • 每10秒钟计算前1分钟的页面浏览量
    • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • 会话窗口
    • 每个会话的网页浏览量,其中会话之间的间隔至少为30分钟
    • EventTimeSessionWindows.withGap(Time.minutes(30))

以下都是一些可以使用的间隔时间 Time.milliseconds(n)Time.seconds(n)Time.minutes(n)Time.hours(n), 和 Time.days(n)

窗口应用函数 

我们有三种最基本的操作窗口内的事件的选项:

  1. 像批量处理,ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;
  2. 或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;
  3. 或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。
DataStream<SensorReading> input = ...;input.keyBy(x -> x.key).window(TumblingEventTimeWindows.of(Time.minutes(1))).process(new MyWastefulMax());public static class MyWastefulMax extends ProcessWindowFunction<SensorReading,                  // 输入类型Tuple3<String, Long, Integer>,  // 输出类型String,                         // 键类型TimeWindow> {                   // 窗口类型@Overridepublic void process(String key,Context context,Iterable<SensorReading> events,Collector<Tuple3<String, Long, Integer>> out) {int max = 0;for (SensorReading event : events) {max = Math.max(event.value, max);}out.collect(Tuple3.of(key, context.window().getEnd(), max));}
}// Context 接口 展示大致如下:
public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();
}// windowState 和 globalState 可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。
// 增量聚合示例
DataStream<SensorReading> input = ...;input.keyBy(x -> x.key).window(TumblingEventTimeWindows.of(Time.minutes(1))).reduce(new MyReducingMax(), new MyWindowFunction());private static class MyReducingMax implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r1 : r2;}
}private static class MyWindowFunction extends ProcessWindowFunction<SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {@Overridepublic void process(String key,Context context,Iterable<SensorReading> maxReading,Collector<Tuple3<String, Long, SensorReading>> out) {SensorReading max = maxReading.iterator().next();out.collect(Tuple3.of(key, context.window().getEnd(), max));}
}

晚到的事件

  • 旁路输出

OutputTag<Event> lateTag = new OutputTag<Event>("late"){};SingleOutputStreamOperator<Event> result = stream.keyBy(...).window(...).sideOutputLateData(lateTag).process(...);DataStream<Event> lateStream = result.getSideOutput(lateTag);
  • 指定允许的延迟
stream.keyBy(...).window(...).allowedLateness(Time.seconds(10)).process(...);

如果延迟的事件需要特殊处理,可以从Context中获取水位线时间来做比较

public void processElement(TaxiFare fare,Context ctx,Collector<Tuple3<Long, Long, Float>> out) throws Exception {long eventTime = fare.getEventTime();TimerService timerService = ctx.timerService();if (eventTime <= timerService.currentWatermark()) {// 事件延迟;其对应的窗口已经触发。} else {// 其他处理}
}

深入了解窗口操作

滑动窗口是通过复制来实现的

滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。

时间窗口会和时间对齐

仅仅因为我们使用的是一个小时的处理时间窗口并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。

请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口 和 滑动窗口 。

window 后面可以接 window

比如说:

stream.keyBy(t -> t.key).window(<window assigner>).reduce(<reduce function>).windowAll(<same window assigner>).reduce(<same reduce function>);

可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。

之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致的。

空的时间窗口不会输出结果

事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。

延迟事件可能导致延迟合并

会话窗口的实现是基于窗口的一个抽象能力,窗口可以 聚合。会话窗口中的每个数据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔,从而产生一个虽然有延迟但是更加准确地结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/1380128.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

网神 SecGate 3600 防火墙任意文件上传漏洞

网神 SecGate 3600 防火墙任意文件上传漏洞 一、 产品简介二、 漏洞概述三、 影响范围四、 复现环境五、 漏洞复现PoC上传哥斯拉马子小龙POC检测: 六、 修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具…

SCAU操作系统知识点之(六)并发:死锁和饥饿

1、死锁原因&#xff1a;竞争资源、进程推进顺序不当 2、资源分配图&#xff08;若死锁&#xff0c;则资源分配图中必有环路&#xff0c;但有环路时不一定死锁&#xff09; 3、死锁的四个必要条件 ①互斥 ②占有且等待 ③不可抢占 ④循环等待 在死锁的四个必要条件中&#xff…

Monge矩阵

Monge矩阵 对一个m*n的实数矩阵A&#xff0c;如果对所有i&#xff0c;j&#xff0c;k和l&#xff0c;1≤ i<k ≤ m和1≤ j<l ≤ n&#xff0c;有 A[i,j]A[k,l] ≤ A[i,l]A[k,j] 那么&#xff0c;此矩阵A为Monge矩阵。 换句话说&#xff0c;每当我们从矩阵中挑…

2.0 Maven基础

1. Maven概述 Maven概念 Apache Maven是一个软件项目管理工具&#xff0c;将项目开发和管理过程抽象程一个项目对象模型&#xff08;POM&#xff0c;Project Object Model&#xff09;。 Maven作用 项目构建 提供标准的、跨平台的自动化项目构建方式。 依赖管理 方便快捷…

字符转ASCII码

一、问题描述 二、代码内容 三、代码解释 # include <iostream> #include <cstdio> using namespace std; int main() { char a;//存放字符a scanf("%d",&a);//输入字符a printf("%d",a);//输出a对应的ASCII码 return 0&#xff1b; …

字符转 ASCII 码

字符转 ASCII 码 //字符转 ASCII 码//1.如下是转换单个字符 //#include <stdio.h> //int main() //{ // char c; // printf("输入一个字符: "); // // // 读取用户输入 // scanf("%c", &c); // // // %d 显示整数 // …

java字符与ASCII码相互转换

java字符与ASCII码相互转换 一 、遍历字符串二、 java 字符 转换 ASCII码三、 java ASCII码 转换 字符 字符串&#xff1a; String s "abcdefg";一 、遍历字符串 public static void main(String[] args) {String s "abcdefg";// 遍历字符串 for (i…

【LeetCode】45. 跳跃游戏 II - 贪婪算法

目录标题 2023-8-11 09:49:25 45. 跳跃游戏 II 2023-8-11 09:49:25 自己没做出来&#xff0c;废物Orz class Solution {public int jump(int[] nums) {int length nums.length;int end 0;int maxPosition 0;int steps 0;for (int i 0; i < length - 1; i) {maxPosit…

docker-compose redis 一直启动失败

环境&#xff1a; centos 8.x 背景 使用docker-compose 来启动redis docker-compose.yml 如下&#xff1a; version: 3.3 services:redis:image: redis:latestrestart: alwayscontainer_name: redisports:- 6379:6379volumes:- ./data:/redis/data- ./redis.conf:/redis/re…

用什么软件抓cd音轨音质最好_开车不嗨皮,那跟咸鱼有什么区别

文 | 大青枣 图 | 潘隐 跑长途是件很无聊的事情&#xff0c;看着车窗外的车水马龙&#xff0c;想到接下来的漫漫长路&#xff0c;立马就想打盹。 但正所谓行车不规范&#xff0c;亲人两行泪。所以为了让能够安全并快乐的从A点到B点。司机和主机厂都会给车里配备一些娱乐系统&am…

群晖DS Video支持DTS音轨(最新解决方案)

目录 一、前言 二、实现 1、下载ffmpeg的DTS支持包 2、安装ffmpeg 3、使用新的ffmpeg覆盖默认版本 4、开启DTS支持 5、可能存在的问题与解决办法 三、惯例 一、前言 最近突然在网上找到了一篇文件提供了DTS音轨的支持方法。于是去尝试了一下&#xff0c;居然真行。于是…

用tsMuxeR GUI给ts视频添加音轨

收藏比赛的都应该知道&#xff0c;高清的直播流录制了后一般是ts或者mkv封装&#xff0c;前者用tsMuxeR GUI可以对视频音频轨进行操作&#xff0c;后者用mkvtoolnix&#xff0c;两者都是无损操作。 至于其他格式就不考虑了&#xff0c;随便用其他的工具&#xff0c;因为本身是有…

Android多媒体(一) 音轨合成 我用双手成就你的梦想

近期需要做音轨合成这样一个功能&#xff0c;何为音轨合成&#xff0c;说白了就是N个音频文件合成一个&#xff0c;同时播放N个声音。然而网上各种找代码&#xff0c;并没有一个能用的&#xff0c;最后终于找到一个外国大神写的合音工具类&#xff0c;稍加修改便成了自己的东西…

FFMpeg 实现从视频中提取音轨

近期由于项目需要&#xff0c;要实现以下功能&#xff1a;将视频中的音轨提取出来&#xff0c;也就是只保留音频部分&#xff0c;以便于后期对于声音的处理。 我选择的工具是 FFMpeg &#xff0c;环境&#xff1a;win7 首先&#xff0c;从官网上下载FFMpeg的文件包&#xff0c;…

html5音轨的提取,(图文)mkv音轨提取软件 如何提取mkv中的音轨

很多人都知道&#xff0c;MKV是个“组合”和“封装”的格式&#xff0c;换句话说就是一种容器格式。最大的特点就是能容纳多种不同类型编码的视频、音频及字幕流。现在流行的高清电影一般都是MKV格式&#xff0c;里面可能包含有多个音轨&#xff0c;方便我们在播放视频时选择需…

html5音轨字幕,(图解)如何修改mkv默认音轨和字幕

平常有下载一些MKV双语电影在家里看,一般播放时电影默认播放外语加中文字幕,不过家里老爸老妈听不懂外语,所以每次看片时我还要手动切换音轨变成国语的。要是可以修改mkv的默认音轨或字幕就好了,于是我就找出了以下修改mkv默认音轨和字幕的解决方案,顺便分享一下,也许能帮…

前后端分离------后端创建笔记(上)

本文章转载于【SpringBootVue】全网最简单但实用的前后端分离项目实战笔记 - 前端_大菜007的博客-CSDN博客 仅用于学习和讨论&#xff0c;如有侵权请联系 源码&#xff1a;https://gitee.com/green_vegetables/x-admin-project.git 素材&#xff1a;https://pan.baidu.com/s/…

提取视频多音轨: 魔力玄(Medlexo) V9.7 (2023-01-31更新)

软件名称 (Windows) 魔力玄(Medlexo) 一句简介 假如一个视频144mb&#xff0c;改后缀等于没转换。如果真正提取音轨&#xff0c;就能得到没转换过的原音轨&#xff0c;大小可能才4mb。 这是一个ffmpeg 以及 yt-dlp 图形化软件&#xff0c;这个工具大小3mb&#xff0c;可以一…

yama搜集的超超…全的下载音效的网站,持续更新

yama有时间就会更新搜集的音效下载网站&#xff0c;哼唧~ 1.Find Sounds.com 一个免费网站&#xff0c;用于在网上查找声音效果。它是一个网络搜索引擎。提供强大的功能&#xff0c;简单易用&#xff0c;我们平时找音频资源的时候就可以到这个网站找你需要的资源&#xff0c;…

RTT(RT-Thread)线程间同步(保姆级)

目录 线程间同步 信号量 信号量结构体 信号量的使用和管理 动态创建信号量 实例 静态创建信号量 初始化和脱离信号量 获取信号量 信号量的互斥操作 获取信号量函数 释放信号量 信号量同步实例 互斥量&#xff08;互斥锁&#xff09; 互斥量的使用和管理 动态创…