【大数据】Flink SQL 语法篇(四):Group 聚合

Flink SQL 语法篇(四):Group 聚合

  • 1.基础概念
  • 2.窗口聚合和 Group 聚合
  • 3.SQL 语义
  • 4.Group 聚合支持 Grouping sets、Rollup、Cube

1.基础概念

Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

在这里插入图片描述

2.窗口聚合和 Group 聚合

应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
)-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
)-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group bydim,-- 按照 Flink SQL tumble 窗口写法划分窗口tumble(row_time, interval '1' minute)

转换为 Group 聚合 的写法如下:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
);-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group bydim,-- 将秒级别时间戳 / 60 转化为 1mincast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

  • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
  • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

3.SQL 语义

SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

  • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
  • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

4.Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id         ),( supplier_id,             rating ),( supplier_id                     ),(              product_id, rating ),(              product_id         ),(                          rating ),(                                 )
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2809709.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

复制策略深入探讨

在之前的博客中,我们讨论了复制最佳实践和不同类型的复制,例如批量、站点和存储桶。但是,随着所有这些不同类型的复制类型的出现,人们不得不想知道在哪里使用哪种复制策略?从现有 S3 兼容数据存储迁移数据时&#xff0…

SV-6301 IP网络可视对讲报警柱简介

SV-6301 IP网络可视对讲报警柱简介 18123651365微信 功能特点: 1.全金属外壳,户外防风雨,坚固耐用,易于识别 2.单键呼叫,可通过软件指定呼叫目标,双向可视对讲广播喊话 3.终端内置扬声器和话筒眯头&…

GZ036 区块链技术应用赛项赛题第10套

2023年全国职业院校技能大赛 高职组 “区块链技术应用” 赛项赛卷(10卷) 任 务 书 参赛队编号: 背景描述 养老保险是对于老年人的最基本的生活保障。各种数据显示,当前的养老金市场规模庞大。2016年美国的养老金资…

SpringCloud-Docker原理解析

Spring Cloud和Docker的结合为微服务架构的部署和管理提供了强大的支持。本文深入剖析Spring Cloud与Docker的集成原理,从服务注册与发现、配置管理、负载均衡到容器化部署等方面展开详细解析。探讨Spring Cloud如何利用Docker容器技术实现服务的弹性伸缩&#xff0…

Linux系统中前后端分离项目部署指南

目录 一.nginx安装以及字启动 解压nginx 一键安装4个依赖 安装nginx 启动 nginx 服务 开放端口号 并且在外部访问 设置nginx自启动 二.配置负载均衡 1.配置一个tomact 修改端口号 8081端口号 2.配置负载均衡 ​编辑 三.部署前后端分离项目 1.项目部署后端 ​编辑…

SpringBoot3——核心特性——快速入门(三)

4、核心技能 4.1、常用注解 SpringBoot摒弃XML配置方式,改为全注解驱动 4.1.1、组件注册 Configuration、SpringBootConfiguration Bean、Scope Controller、 Service、Repository、Component Import ComponentScan 步骤: 1、Configuration 编写一个配置…

MFC由初值终值步长生成数值序列

matlab的冒号运算符可以生成数值序列; 下面来生成自己的数值序列; vc6新建一个对话框工程; 放几个控件;添加成员变量如下; void CMycolonDlg::OnButton1() {// TODO: Add your control notification handler code hereUpdateData(TRUE);double d1, d2;CString str1, …

Linux基础命令—进程管理

基础知识 linux进程管理 什么是进程 开发写代码->代码运行起来->进程 运行起来的程序叫做进程程序与进程区别 1.程序是一个静态的概念,主要是指令集和数据的结合,可以长期存放在操作系统中 2.进程是一个动态的概念,主要是程序的运行状态,进程存在生命周期,生命周期结…

YOLO算法改进Backbone系列之:EfficientViT

EfficientViT: Memory Effificient Vision Transformer with Cascaded Group Attention 摘要:视觉transformer由于其高模型能力而取得了巨大的成功。然而,它们卓越的性能伴随着沉重的计算成本,这使得它们不适合实时应用。在这篇论文中&#x…

ChatGPT/GPT4科研应用与AI绘图及论文高效写作

原文:ChatGPT/GPT4科研应用与AI绘图及论文高效写作 第一:2024年AI领域最新技术 1.OpenAI新模型-GPT-5 2.谷歌新模型-Gemini Ultra 3.Meta新模型-LLama3 4.科大讯飞-星火认知 5.百度-文心一言 6.MoonshotAI-Kimi 7.智谱AI-GLM-4 第二:…

【CT成像】VGSTUDIO MAX最小系统要求检查缺少支持OpenGL3.3的解决办法

【CT成像】VGSTUDIO MAX最小系统要求检查缺少支持OpenGL3.3的解决办法 1.背景2.分析3.解决办法4.资源 1.背景 我把自己的台式机电脑进行了VMware ESXi 虚拟化。 在vmware ESXi系统中安装了windows系统, 并在windows系统中安装了VGSTUDIO MAX软件。 在运行VGSTUDIO…

matlab一维二维和三维RBF插值方法

1、内容简介 略 50-可以交流、咨询、答疑 2、内容说明 略 3、仿真分析 略 matlab一维二维和三维RBF插值方法_哔哩哔哩_bilibili 4、参考论文 略

电商赠品数据可以监测吗

很多店铺为了引流,会在标题、图片上标明促销活动,常见的有赠品描述,由于现在监测价格技术的壁垒,其实很多时候,主图上的赠品信息因其描述方式、字体等的不同,会导致监测不出来的情况出现,这也给…

代码签名常见问题解答!!!

1、什么是代码签名证书? 代码签名证书(Code Signing Certificate)是受信任的证书颁发机构颁发给软件开发者,对其开发的可执行脚本、软件代码和内容进行数字签名的数字证书。代码签名证书用于验证开发者身份真实性、保护代码的完整…

【计算机网络】传输层——TCP和UDP详解

文章目录 一. TCP和UDP简介二. UDP 协议详解1. UDP报文格式2. UDP的使用场景 三. TCP 协议详解1. TCP报文格式2. TCP协议的重要机制确认应答(保证可靠传输的最核心机制)超时重传连接管理(三次握手、四次挥手)!&#xf…

微信小程序-宿主环境-开发文档学习笔记

查看更多学习笔记:GitHub:LoveEmiliaForever 微信小程序开发指南 微信小程序开发文档 渲染层和逻辑层 WXML 模板和 WXSS 样式工作在渲染层,JS 脚本工作在逻辑层 渲染层和数据相关。逻辑层负责产生、处理数据。逻辑层通过 Page 实例的 setD…

09 Redis之分布式系统(数据分区算法 + 系统搭建与集群操作)

6 分布式系统 Redis 分布式系统,官方称为 Redis Cluster,Redis 集群,其是 Redis 3.0 开始推出的分布式解决方案。其可以很好地解决不同 Redis 节点存放不同数据,并将用户请求方便地路由到不同 Redis 的问题。 什么是分布式系统?…

unity发布webGL压缩方式的gzip,使用nginx作为web服务器时的配置文件

unity发布webGL压缩方式的gzip,使用nginx作为web服务器时的配置文件 Unity版本是:2021.3 nginx的版本是:nginx-1.25.4 Unity发布webgl时的测试 设置压缩方式是gzip nginx配置文件 worker_processes 1;events {worker_connections 102…

Open CASCADE学习|提取面的内外环线

在Open CASCADE中,区分内环和外环主要基于面的参数域内环线方向的定义。具体来说,在面的参数域内,沿着环线正方向前进时,如果左侧为面内、右侧为面外,那么该环线被视为外环;反之,如果左侧为面外…

k8s Pod基础(概念,容器功能及分类,镜像拉取和容器重启策略)

目录 pod概念 Kubernetes设计Pod概念和特殊组成结构的用意 Pod内部结构: 网络共享: 存储共享: pause容器主要功能 pod创建方式 pod使用方式 pod分类 pod的容器分类 基础容器(infrastructure container)&…