flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

flink sql 实战实例 及延伸问题

  • Flink SQL 计算用户分布
  • Flink SQL 计算 DAU
  • 多topic 数据更新mysql topic接入mysql
    • 引入 upsert-kafka-connector 以1.14.4版本为例
  • 数据倾斜问题:
  • 让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?
  • 多维高阶聚合
  • FlinkSql Upsert 与 Primary Key
  • flinksql Hive 流批一体
    • Streaming Sink
    • Streaming Source
    • Hive Dialect
    • Filesystem Connector

Flink SQL 计算用户分布

⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。

- 如果需要可以打开 minibatch
select  level, count(1) as uv, max(time) as time
from (select uid, level, time, row_number() over (partition by uid order by time desc) rn from source
) tmp
where rn =1 
group by level

Flink SQL 计算 DAU

⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。

SELECT  window_start, window_end, platform, sum(bucket_dau) as dau
from (SELECTwindow_start, window_end, platform, count(distinct uid) as bucket_dauFROM TABLE(CUMULATE(TABLE user_log,DESCRIPTOR(time),INTERVAL '60' SECOND, INTERVAL '1' DAY))GROUP BY                                  window_start, window_end, platform, MOD(HASH_CODE(user_id), 1024)
) tmp
GROUP by   window_start, window_end, platform优点:如果是曲线图的需求,可以完美回溯曲线图。
缺点:大窗口之间如果有数据乱序,有丢数风险;并且由于是 watermark 推动产出,所以数据产出会有延迟。或或或或或或或或或或或或或或或或或或或或或或或
-- 如果需要可以打开 minibatch
select platform, count(1) as dau, max(time) as time
from (select uid, platform, time, row_number() over (partition by uid, platform, time / 24 / 3600 / 1000 order by time desc) rnfrom source
) tmp
where rn = 1
group byplatform优点:计算快。缺点:任务发生 failover,曲线图不能很好回溯。没法支持 cube 计算。或或或或或或或或或或或或或或或或或或或或或或或
-- 如果需要可以打开 minibatch
SELECT   max(time) as time, platform, sum(bucket_dau) as dau
from (SELECTmax(time) as time, platform, count(distinct uid) as bucket_dauFROM sourceGROUP BYplatform, MOD(HASH_CODE(user_id), 1024)
) t 
GROUP by   platform优点:计算快,支持 cube 计算。
缺点:任务发生 failover,曲线图不能很好回溯。

多topic 数据更新mysql topic接入mysql

-- 作业开发逻辑-- mysql -h数据库 -ubigdata_rw -pe20ycoy3yp09qij0kj8ngpcgxyywgmc9-- -Dyarn.application.queue=stream_data   -Dyarn.provided.lib.dirs=/streamx/flink/flink-1.12.5/lib/
CREATE TABLE Direction_Wind_create_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_create_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_like_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_like_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_comment_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_comment_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_share_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_share_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_create_sink (feed_id VARCHAR,user_id VARCHAR,feed_type VARCHAR,time_stamp timestamp(3),area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_create_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.user_id as user_id,cast(properties.feed_type as varchar) as feed_type--  ,TO_DATE(properties.`timestamp`) as time_stamp,TO_TIMESTAMP(FROM_UNIXTIME(cast(properties.`timestamp` as bigint))) as time_stamp,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_create_source
) where rowNum = 1
;CREATE TABLE Direction_Wind_like_sink (feed_id VARCHAR,like_count BIGINT,feed_type VARCHAR,area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_like_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.like_count  as like_count,cast(properties.feed_type as varchar) as feed_type,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_like_source
) where rowNum = 1
;CREATE TABLE Direction_Wind_comment_sink (feed_id VARCHAR,comment_count BIGINT,feed_type VARCHAR,area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_comment_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.comment_count  as comment_count,cast(properties.feed_type as varchar) as feed_type,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_comment_source
) where rowNum = 1
;CREATE TABLE Direction_Wind_share_sink (feed_id VARCHAR,use_count BIGINT,feed_type VARCHAR,area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_share_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.share_count  as   use_count,cast(properties.feed_type as varchar) as feed_type,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_share_source
) where rowNum = 1
;
这里要注意,如果去重直接用group by的方式,在批处理中还好,流式处理中,这部分数据会存放到内容中,并且越积越大,没有ttl,时间一长就会oom了,
Flink SQL Deduplicate 写法,row_number partition by user_id order by proctime asc,此 SQL 最后生成的算子只会在第一条数据来的时候更新 state,后续访问不会更新 state TTL,因此 state 会在用户设置的 state TTL 时间之后过期。  所以按理说 这种去重方式 不会百分百 有用,只能保持一段时间的 去重,感觉是不对的,正在测试中。

经过测试 在flink 1.12 版本时,flinksql的upsert into 功能 ,也就是 这种写法
在这里插入图片描述
是可以实现 update 功能的,但 必须要 group by 数据才行,并且要求把 把所有select 语句中的字段 都加入到 group by 语句,但这么写,又会导致 state 不停增大,过一段时间就会 OOM

引入 upsert-kafka-connector 以1.14.4版本为例

基本工作机制:

  • source:

在这里插入图片描述

  • sink:
    在这里插入图片描述
tenv.executeSql("CREATE TABLE upsert_kafka ("+ "province STRING, "+ "pv BIGINT, "+ "PRIMARY KEY (province) NOT ENFORCED"+ ") WITH ("+ "'connector' = 'upsert-kafka', "+ "'topic' = 'upsert_kafka2', "+ "'properties.bootstrap.servers' = 'doitedu:9092', "+ "'key.format' = 'csv', "+ "'value.format' = 'csv'"+ ")"
);DataStreamSource<Row> stream = env.fromElements(Row.ofKind(RowKind.INSERT, "sx", 1),Row.ofKind(RowKind.INSERT, "sx", 2),Row.ofKind(RowKind.INSERT, "gx", 1),Row.ofKind(RowKind.INSERT, "sx", 2),Row.ofKind(RowKind.INSERT, "gx", 2)
);tenv.createTemporaryView("s", stream, Schema.newBuilder().column("f0", DataTypes.STRING().notNull()).column("f1", DataTypes.INT()).build());
// 将查询结果(changelog 流),写入 kafka
tenv.executeSql("insert into upsert_kafka select f0, sum(f1) as pv from s group by f0");

写入的数据为
在这里插入图片描述

// 从 kafka 再读出上面的 changelog 结果 tenv.executeSql(" select * from upsert_kafka").print();
tenv.executeSql("select * from upsert_kafka").print();

读出的数据为
在这里插入图片描述

数据倾斜问题:

⭐ 场景:拿计算直播间的同时在线观看用户数来说,大 v 直播间的人数会比小直播间的任务多几个量级,因此如果计算一个直播间的数据需要注意这种业务数据倾斜的特点
⭐ 解决方案:计算这种数据时,我们可以先按照直播间 id 将数据进行打散,如下 SQL 案例所示(DataStream 也是相同的解决方案),内层打散,外层合并:

select id, sum(bucket_uv) as uv
from (select id, count(distinct uid) as bucket_uv from source group byid, mod(uid, 1000) -- 将大 v 分桶打散
)
group by id

⭐ 数据任务处理时参数\代码处理逻辑导致倾斜:
⭐ 场景:比如有时候虽然用户已经按照 key 进行分桶计算,但是【最大并发度】设置为 150,【并发度】设置为 100,会导致 keygroup 在 sub-task 的划分不均匀(其中 50 个 sub-task 的 keygroup 为 2 个,剩下的 50 个 sub-task 为 1 个)导致数据倾斜。
⭐ 解决方案:设置合理的【最大并发度】【并发度】,【最大并发度】最好为【并发度】的倍数关系,比如【最大并发度】1024,【并发度】512
⭐ 我已经设置【数据分桶打散】+【最大并发为并发 n 倍】,为啥还出现数据倾斜?
⭐ 场景:你的【数据分桶】和【最大并发数】之间可能是不均匀的。因为 Flink 会将 keyby 的 key 拿到之后计算 hash 值,然后根据 hash 值去决定发送到那个 sub-task 去计算。这是就有可能出现你的【数据分桶】key 经过 hash 计算完成之后,并不能均匀的发到所有的 keygroup 中。比如【最大并发数】4096,【数据分桶】key 只有 1024 个,那么这些数据必然最多只能到 1024 个 keygroup 中,有可能还少于 1024,从而导致剩下的 3072 个 keygroup 没有任何数据
⭐ 解决方案:其实可以利用【数据分桶】key 和【最大并行度】两个参数,在 keyby 中实现和 Flink key hash 选择 keygroup 的算法一致的算法,在【最大并发数】4096,【数据分桶】为 4096 时,做到分桶值为 1 的数据一定会发送到 keygroup1 中,2 一定会发到 keygroup2 中,从而缓解数据倾斜。

最大并行度的设置:最大并行度可以自己设置,也可以框架默认生成;默认的算法是取当前算子并行度的 1.5 倍和 2 的 7 次方比较,取两者之间的最大值,然后用上面的结果和 2 的 15 次方比较,取其中的最小值为默认的最大并行度,非常不建议自动生成,建议用户自己设置。

让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?

在这里插入图片描述
⭐ 有提到将用户上线标记为 1,下线标记为 0 的,然后将上线下线数据发到消息队列用实时计算引擎统计的

⭐ 有提到将用户心跳日志借助 Session Window Dynamic Gap 计算的

在这里插入图片描述

多维高阶聚合

在这里插入图片描述

FlinkSql Upsert 与 Primary Key

在flink1.11 及以后,flinksql 与blink 做了merge 所以有重大变更
流计算的一个典型场景是把聚合的数据写入到 Upsert Sink 中,比如 JDBC、HBase,当遇到复杂的 SQL 时,时常会出现:
在这里插入图片描述
UpsertStreamTableSink 需要上游的 Query 有完整的 Primary Key 信息,不然就直接抛异常。这个现象涉及到 Flink 的 UpsertStreamTableSink 机制。顾名思义,它是一个更新的 Sink,需要按 Key 来更新,所以必须要有 Key 信息。

如何发现 Primary Key?一个方法是让优化器从 Query 中推断,如下图发现 Primary Key 的例子。

这种情况下在简单 Query 当中很好,也满足语义,也非常自然。但是如果是一个复杂的 Query,比如聚合又 Join 再聚合,那就只有报错了。不能期待优化器有多智能,很多情况它都不能推断出 PK,而且,可能业务的 SQL 本身就不能推断出 PK,所以导致了这样的异常。

在这里插入图片描述
怎么解决问题?Flink 1.11 彻底的抛弃了这个机制,不再从 Query 来推断 PK 了,而是完全依赖 Create table 语法。比如 Create 一个 jdbc_table,需要在定义中显式地写好 Primary Key(后面 NOT ENFORCED 的意思是不强校验,因为 Connector 也许没有具备 PK 的强校验的能力)。当指定了 PK,就相当于就告诉框架这个Jdbc Sink 会按照对应的 Key 来进行更新。如此,就跟 Query 完全没有关系了,这样的设计可以定义得非常清晰,如何更新完全按照设置的定义来。

CREATE TABLE jdbc_table (id BIGINT,...PRIMARY KEY (id) NOT ENFORCED
)

flinksql Hive 流批一体

首先看传统的 Hive 数仓。一个典型的 Hive 数仓如下图所示。一般来说,ETL 使用调度工具来调度作业,比如作业每天调度一次或者每小时调度一次。这里的调度,其实也是一个叠加的延迟。调度产生 Table1,再产生 Table2,再调度产生 Table3,计算延时需要叠加起来
在这里插入图片描述
问题是慢,延迟大,并且 Ad-hoc 分析延迟也比较大,因为前面的数据入库,或者前面的调度的 ETL 会有很大的延迟。Ad-hoc 分析再快返回,看到的也是历史数据。

所以现在流行构建实时数仓,从 Kafka 读计算写入 Kafka,最后再输出到 BI DB,BI DB 提供实时的数据服务,可以实时查询。Kafka 的 ETL 为实时作业,它的延时甚至可能达到毫秒级。实时数仓依赖 Queue,它的所有数据存储都是基于 Queue 或者实时数据库,这样实时性很好,延时低。但是:

第一,基于 Queue,一般来说就是行存加 Queue,存储效率其实不高。
第二,基于预计算,最终会落到 BI DB,已经是聚合好的数据了,没有历史数据。而且 Kafka 存的一般来说都是 15 天以内的数据,没有历史数据,意味着无法进行 Ad-hoc 分析。所有的分析全是预定义好的,必须要起对应的实时作业,且写到 DB 中,这样才可用。对比来说,Hive 数仓的好处在于它可以进行 Ad-hoc 分析,想要什么结果,就可以随时得到什么结果。
在这里插入图片描述
能否结合离线数仓和实时数仓两者的优势,然后构建一个 Lambda 的架构?

核心问题在于成本过高。无论是维护成本、计算成本还是存储成本等都很高。并且两边的数据还要保持一致性,离线数仓写完 Hive 数仓、SQL,然后实时数仓也要写完相应 SQL,将造成大量的重复开发。还可能存在团队上分为离线团队和实时团队,两个团队之间的沟通、迁移、对数据等将带来大量人力成本。如今,实时分析会越来越多,不断的发生迁移,导致重复开发的成本也越来越高。少部分重要的作业尚可接受,如果是大量的作业,维护成本其实是非常大的。

如何既享受 Ad-hoc 的好处,又能实现实时化的优势?一种思路是将 Hive 的离线数仓进行实时化,就算不能毫秒级的实时,准实时也好。所以,Flink 1.11 在 Hive 流批一体上做了一些探索和尝试,如下图所示。它能实时地按 Streaming 的方式来导出数据,写到 BI DB 中,并且这套系统也可以用分析计算框架来进行 Ad-hoc 的分析。这个图当中,最重要的就是 Flink Streaming 的导入。
在这里插入图片描述

Streaming Sink

早期 Flink 版本在 DataStreaming 层,已经有一个强大的 StreamingFileSink 将流数据写到文件系统。它是一个准实时的、Exactly-once 的系统,能实现一条数据不多,一条数据不少的 Sink。
在这里插入图片描述
具体原理是基于两阶段提交:

第一阶段:SnapshotPerTask,关闭需要 Commit 的文件,或者记录正在写的文件的 Offset。
第二阶段:NotifyCheckpointComplete,Rename 需要 Commit 的文件。注意,Rename 是一个原子且幂等的操作,所以只要保证 Rename 的 At-least-once,即可保证数据的 Exactly-once。

这样一个 File system 的 Writer 看似比较完美了。但是在 Hive 数仓中,数据的可见性是依赖 Hive Metastore 的,那在这个流程中,谁来通知 Hive Metastore 呢?
在这里插入图片描述
SQL 层在 StreamingFileSink,扩展了 Partition 的 Committer。

相当于不仅要进行 File 的 Commit,还要进行 Partition 的 Commit。如图所示,FileWriter 对应之前的 StreamingFileSink,它提供的是 Exactly-once 的 FileWriter。而后面再接了一个节点 PartitionCommitter。支持的 Commit Policy 有:

  • 内置支持 Add partition 到 Hive metastore;
  • 支持写 SuccessFile 到文件系统当中;
  • 并且也可以自定义 Committer,比如可以 analysis partition、合并 partition 里面的小文件。

Committer 挂在 Writer 后, 由 Commit Trigger 决定什么时机来 commit :

  • 默认的 commit 时机是,有文件就立即 commit。因为所有 commit 都是可重入的,所以这一点是可允许的。

  • 另外,也支持通过 partition 时间和 Watermark 来共同决定的。比如小时分区,如果现在时间到 11 点,10 点的分区就可以 commit 了。Watermark 保证了作业当前的准确性。

Streaming Source

Hive 数仓中存在大量的 ETL 任务,这些任务往往是通过调度工具来周期性的运行,这样做主要有两个问题:

  • 实时性不强,往往调度最小也是小时级。
  • 流程复杂,组件多,容易出现问题。

针对这些离线的 ETL 作业,Flink 1.11 为此开发了实时化的 Hive 流读,支持:

  • Partition 表,监控 Partition 的生成,增量读取新的 Partition。
  • 非 Partition 表,监控文件夹内新文件的生成,增量读取新的文件。

甚至可以使用 10 分钟级别的分区策略,使用 Flink 的 Hive streaming source 和 Hive streaming sink ,可以大大提高 Hive 数仓的实时性到准实时分钟级,在实时化的同时,也支持针对 Table 全量的 Ad-hoc 查询,提高灵活性。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'=’true’, 'streaming-source.consume-start-offset'='2020-05-20') */;/*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.partition.include' = 'latest','streaming-source.partition-order' = 'create-time','streaming-source.monitor-interval' = '1 h') */

另外除了 Scan 的读取方式,Flink 1.11 也支持了 Temporal Join 的方式,也就是以前常说的 Streaming Dim Join。

SELECTo.amout, o.currency, r.rate, o.amount * r.rate
FROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency

目前支持的方式是 Cache All,并且是不感知分区的,比较适合小表的情况。

Hive Dialect

Flink SQL 遵循的是 ANSI-SQL 的标准,而 Hive SQL 有它自己的 HQL 语法,它们之间的语法、语义都有些许不同。

如何让 Hive 用户迁移到 Flink 生态中,同时避免用户太大的学习成本?为此, Flink SQL 1.11 提供了 Hive Dialect,可以使得用户在 Flink 生态中使用 HQL 语言来计算。目前只支持 DDL,后续版本会逐步攻坚 Qeuries。

Filesystem Connector

Hive Integration 提供了一个重量级的集成,功能丰富,但是环境比较复杂。如果只是想要一个轻量级的 Filesystem 读写呢?

Flink table 在长久以来只支持一个 CSV 的 Filesystem Table,并且还不支持 Partition,行为上在某些方面也有些不符合大数据计算的直觉。

Flink 1.11 重构了整个 Filesystem connector 的实现:

  • 结合 Partition,现在,Filesystem connector 支持 SQL 中 Partition 的所有语义,支持 Partition 的 DDL,支持 Partition Pruning,支持静态 / 动态 Partition 的插入,支持 Overwrite 的插入。
  • 支持各种 Formats: ■ CSV ■ JSON ■ Aparch AVRO ■ Apache Parquet ■ Apache ORC
  • 支持 Batch 的读写。
  • 支持 Streaming sink,也支持 Partition commit,支持写 Success 文件。

用几句简单的 SQL,不用搭建 Hive 集成环境即可:

  • 启动一个流作业写入 Filesystem 中,然后在 Hive 端即可查询到 Filesystem 上的数据,相比之前 Datastream 的作业,简单 SQL 即可搞定离线数据的入库。
  • 通过 Filesystem Connector 来查询 Hive 数仓中的数据,功能没有 Hive 集成那么全,但是定义简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2809039.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

1 easy 88. 合并两个有序数组

方法1: Arrays.sort //给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 // // 请你 合并 nums2 到 nums1 中&#xff0c;使合并后的数组同样按 非递减顺序 排列。 // // 注意&am…

Linux环境下C语言实现ping命令

Linux环境下C语言实现ping命令 涉及的知识点 Linux信号量的使用 SIGALRM信号是操作系统中的其中一个信号。他的作用是设置进程隔多久后会收到一个SIGALRM信号 #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <signal.h> …

SSM---Mybatis查询数据库的功能

Mybatis查询数据库的功能流程&#xff1a; 在maven中加入mybatis依赖&#xff0c;mysql驱动依赖创建一张student表创建表对应的实体类&#xff1a;student类&#xff0c;用来保存表中的每行数据创建持久层的DAO接口&#xff0c;用来定义操作数据库的方法创建这个表对应的sql映…

前端发送请求,明明登录进去了,为什么获取用户信息不行,后端总是识别不到token——跨域的问题

跨域问题 今天在对接前后端的时候&#xff0c;发现明明系统的登录接口都是好的&#xff0c;但是偏偏就是获取不到用户的信息&#xff0c;后端总是报错说读取不到有效的token。 总是说请求头中读取的token是null 在经过不断的排查和上网需求帮助的时候&#xff0c;我总结了以下…

JavaWeb——007MYSQL(DQL多表设计)

# 数据库开发-MySQL 一级目录二级目录三级目录 1. 数据库操作-DQL1.1 介绍1.2 语法1.3 基本查询1.4 条件查询1.5 聚合函数1.6 分组查询1.7 排序查询1.8 分页查询1.9 案例1.9.1 案例一1.9.2 案例二 2. 多表设计2.1 一对多2.1.1 表设计2.1.2 外键约束 2.2 一对一2.3 多对多2.4 案…

ROS2高效学习第四章 -- ros2 topic 编程之自定义 msg

ros2 topic 编程之自定义 msg 1 前言和资料2 正文2.1 两种自定义 msg 方式的讨论2.2 自定义 msg 独立存在2.2.1 自定义 msg 包&#xff08;diy_interface&#xff09;2.2.2 pubsub_cpp 收发自定义 msg2.2.3 pubsub_py 收发自定义 msg 2.3 自定义 msg 放在模块包里&#xff08;p…

逻辑思维1000题丨【Exercise 1】解析

目录 声明 解析 声明 解析网上是搜不到的&#xff0c;100%原创。本专栏只会讲解重难点题目&#xff0c;简单题目不做讲解。 解析 2.What is the missing number? 1 12 13 124 15 1236 17 1248 139 12510 111 1234612 113 12714 13515 &#xff1f; 观察每个数字的末尾分别是…

Redis高并发分布锁实战

Redis高并发分布锁实战 问题场景 场景一: 没有捕获异常 // 仅仅加锁 // 读取 stock15 Boolean ret stringRedisTemplate.opsForValue().setIfAbsent("lock_key", "1"); // jedis.setnx(k,v) // TODO 业务代码 stock-- stringRedisTemplate.delete(&quo…

HarmonyOS服务卡片开发指导(Stage模型)概述

服务卡片概述 服务卡片&#xff08;以下简称“卡片”&#xff09;是一种界面展示形式&#xff0c;可以将应用的重要信息或操作前置到卡片&#xff0c;以达到服务直达、减少体验层级的目的。卡片常用于嵌入到其他应用&#xff08;当前卡片使用方只支持系统应用&#xff0c;如桌…

Android Gradle 开发与应用 (一) : Gradle基础

1. Gradle是什么 Gradle是一个通用的构建工具&#xff0c;支持诸多主要的 IDE&#xff0c;包括 Android Studio、IntelliJ IDEA、Visual Studio 等 Gradle 的底层实现(核心引擎和框架)其实是用 Java 编写的开发者通常使用 Groovy 或 Kotlin 来编写构建脚本 1.1 那么为什么Gra…

你真的了解@Async吗?

你真的了解Async吗&#xff1f; 使用场景&#xff1a; 开发中会碰到一些耗时较长或者不需要立即得到执行结果的逻辑&#xff0c;比如消息推送、商品同步等都可以使用异步方法&#xff0c;这时我们可以用到Async。但是直接使用 Async 会有风险&#xff0c;当我们没有指定线程池…

速度提高100倍 - 扩展 RAG 应用程序,以实现数十亿个嵌入,并行计算余弦相似度

原文链接&#xff1a;100x Faster — Scaling Your RAG App for Billions of Embeddings 2024 年 2 月 15 日 RAG应用程序最大的问题之一是它们的计算检索时间。想象一下&#xff0c;你有一个向量数据库&#xff0c;包含一万亿条Embedding向量的记录。当您尝试将用户查询与一…

数字化转型导师坚鹏:政府数字化转型智慧城市类案例研究

政府数字化转型智慧城市类案例研究 课程背景&#xff1a; 很多地方政府存在以下问题&#xff1a; 不清楚政府数字化转型的智慧城市类成功案例 不清楚政府数字化转型的城市大脑类成功案例 不清楚政府数字化转型的综合实践类成功案例 课程特色&#xff1a; 针对性强 …

LDRA Testbed软件静态分析_软件质量度量

系列文章目录 LDRA Testbed软件静态分析_操作指南 LDRA Testbed软件静态分析_自动提取静态分析数据生成文档 LDRA Testbed软件静态分析_Jenkins持续集成_(1)自动进行静态分析的环境搭建 LDRA Testbed软件静态分析_Jenkins持续集成_(2)配置邮件自动发送静态分析结果 LDRA Testb…

LeetCode--代码详解 236. 二叉树的最近公共祖先

236. 二叉树的最近公共祖先 题目 给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为&#xff1a;“对于有根树 T 的两个节点 p、q&#xff0c;最近公共祖先表示为一个节点 x&#xff0c;满足 x 是 p、q 的祖先且 x 的深度尽可能大&am…

python 循环语句 while 循环

while循环 Python 编程中 while 语句用于循环执行程序&#xff0c;即在某条件下&#xff0c;循环执行某段程序&#xff0c;以处理需要重复处理的相同任务。其基本形式为&#xff1a; while 判断条件(condition)&#xff1a; 执行语句(statements)…… 执行语句可以是单个语句…

遥感影像目标检测:从CNN(Faster-RCNN)到Transformer(DETR)

我国高分辨率对地观测系统重大专项已全面启动&#xff0c;高空间、高光谱、高时间分辨率和宽地面覆盖于一体的全球天空地一体化立体对地观测网逐步形成&#xff0c;将成为保障国家安全的基础性和战略性资源。未来10年全球每天获取的观测数据将超过10PB&#xff0c;遥感大数据时…

【数据结构】周末作业

1.new(struct list_head*)malloc(sizeof(struct list_head*)); if(newNULL) { printf("失败\n"); return; } new->nextprev->next; prev->nextnew; return; 2.struct list_head* pprev->next; prev->nextp->next; p->next->prevpr…

Oracle conn / as sysdba遇到ORA-01031: insufficient privileges错误

背景 oracle 突然挂了&#xff0c;处于锁定状态&#xff0c;然后打算重新启动一下子。 遂 sqlplus /nolog conn / as sysdba 然后就出现了以下错误。。 ORA-01031: insufficient privileges 1.查了一圈&#xff0c;有说是 计算机 》 管理》本地用户和组》组》ORA_DBA&am…

python 3.11中安装sympy(符号工具包)

1.python环境&#xff1a; 2.安装遇到问题&#xff1a; … 3.升级pip cmd命令行中&#xff0c;执行如下命令&#xff1a; python.exe -m pip installl --upgrade pip 4.再次安装sympy cmd命令行中&#xff0c;执行如下命令&#xff1a; pip install sympy 5.简单应用 对…