大数据-玩转数据-Flink窗口函数

一、Flink窗口函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。

二、ReduceFunction(增量聚合函数)

输入和输出必须一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}

运行结果
在这里插入图片描述
在这里插入图片描述

三、AggregateFunction(增量聚合函数)

输入和输出可以不一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}

运行结果
在这里插入图片描述
在这里插入图片描述

四、ProcessWindowFunction(全窗口函数)

上面例子里已经用到

new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/1618517.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

软考:中级软件设计师:网络类型与拓扑结构,网络规划与设计,ip地址与子网划分,特殊含义的IP地址

软考&#xff1a;中级软件设计师:网络类型与拓扑结构 提示&#xff1a;系列被面试官问的问题&#xff0c;我自己当时不会&#xff0c;所以下来自己复盘一下&#xff0c;认真学习和总结&#xff0c;以应对未来更多的可能性 关于互联网大厂的笔试面试&#xff0c;都是需要细心准…

Qt入门教程【Core篇】Layout布局(布局管理器、手动布局)

&#x1f608;「编程小鱼酱秘密基地」&#xff1a;传送门 &#x1f608;「CSDN主页」&#xff1a;传送门 &#x1f608;「Bilibil首页」&#xff1a;传送门 &#x1f608;「网易云课堂」&#xff1a;传送门 &#x1f608;「CSDN学院」&#xff1a;传送门 &#x1f608;「51CTO学…

前端布局 Flex(弹性)布局

1. flex布局优点 操作方便&#xff0c;布局极为简单&#xff0c;移动端应用很广泛 pc端浏览器支持情况较差 IE11或者更低版本&#xff0c;不支持或仅部分支持 2. flex布局原理 flex意为“弹性布局”&#xff0c;用来为盒状模型提供最大的灵活性&#xff0c;任何一个容器都…

Java BorderLayout(边框布局)布局管理器

BorderLayout BorderLayout 将容器分为 EAST 、 SOUTH 、 WEST 、 NORTH 、 CENTER五个区域&#xff0c;普通组件可以被放置在这 5 个区域的任意一个中 。 BorderLayout布局 管理器的布局示意图如图所示 。 当改变使用 BorderLayout 的容器大小时&#xff0c; NORTH 、 SOUTH …

java:布局方法(网格布局)

网格布局 一、简单说明二、关键代码三、流程图四、例子说明1. 有17个“按钮”排列&#xff08;1&#xff09;源码A&#xff08;2&#xff09;运行效果 2. 有36个“按钮”排列&#xff08;1&#xff09;源码B&#xff08;2&#xff09;源码B运行效果 3. 有12个“按钮”排列&…

Grid布局介绍

1、什么是Grid布局 ​     Grid布局即网格布局&#xff0c;是一种新的css模型&#xff0c;一般是将一个页面划分成几个主要的区域&#xff0c;定义这些区域的大小、位置和层次等关系&#xff0c;是目前唯一一种css二维布局。 2、和flex布局的区别 ​     Grid布局和fle…

Java GridLayout(网格布局)布局管理器

GridLayout&#xff08;网格布局&#xff09; ​ GridLayout 布局管理器将容器分割成纵横线分隔的网格 &#xff0c; 每个网格所占的区域大小相同。当向使用 GridLayout 布局管理器的容器中添加组件时&#xff0c; 默认从左向右、 从上向下依次添加到每个网格中 。 与 FlowLay…

css经典布局——圣杯布局

圣杯布局和双飞翼布局一直是前端面试的高频考点&#xff0c;圣杯布局的出现是来自由 Matthew Levine 在 2006 年写的一篇文章 《In Search of the Holy Grail》。 比起双飞翼布局&#xff0c;它的起源不是源于对页面的形象表达。在西方&#xff0c;圣杯是表达“渴求之物”的意思…

CSS响应式布局(自适应布局)

CSS 响应式布局也称自适应布局&#xff0c;是 Ethan Marcotte 在 2010 年 5 月份提出的一个概念&#xff0c;简单来讲就是一个网站能够兼容多个不同的终端&#xff08;设备&#xff09;&#xff0c;而不是为每个终端做一个特定的版本。这个概念是为解决移动端浏览网页而诞生的。…

flex布局(详解)

目录 前言 一、何为Flex布局 二、基本概念 三、容器的属性 3.1 flex-direction属性 3.2 flex-wrap属性 3.3 flex-flow 3.4 justify-content属性 3.5 align-items属性 3.6 align-content属性 四、项目的属性 4.1 order属性 4.2 flex-grow属性 4.3 flex-shrink属性 …

Redis各类数据结构应用场景总结

Redis各类数据结构应用场景总结 引言String应用场景 List应用场景 Hash应用场景 Set应用场景 ZSet应用场景 小结 引言 实际面试过程中更多看重的是对Redis相关数据结构的活学活用&#xff0c;同时也可能会引申出Redis相关底层数据结构原理的实现&#xff0c;笔者最近面试过程中…

CSS基本布局——grid布局

grid布局简介&#xff1a; Grid布局是将容器划分成“行”和“列”&#xff0c;产生单元格&#xff0c;然后指定“项目所在”的单元格&#xff0c;可以看作是二维布局。 基本概念&#xff1a; 容器&#xff08;container&#xff09;——有容器属性项目&#xff08;items&…

【CSS布局】—— flex(弹性)布局

赶快悄悄的努力起来吧&#xff0c;不苒在这里衷心祝愿各位大佬都能顺利通过面试。 面试专栏分享&#xff0c;感觉有用的小伙伴可以点个订阅&#xff0c;不定时更新相关面试题&#xff1a;面试专栏 。 文章目录 &#x1f33c;前言&#x1f33b;正文1、弹性盒子模型是什么&#x…

火列星屯--最强大的CSS布局方案

一、总论 首先给出结论&#xff1a;网格布局&#xff08;Grid&#xff09;是最强大的 CSS 布局方案。 虽说如此&#xff0c;但是仍要重视具体的应用场景&#xff0c;如果有其他写法可以提高代码的简洁和可读性&#xff0c;未必非要选择Gird不可。 网格布局就如同它的字面意思…

常见的布局方式

1.弹性布局---flex布局 弹性布局是一种常见且好用的布局方式&#xff0c;它可以让结构中的内容自适应不同的分辨率&#xff0c;简化的代码的书写。 使用方法 给父盒子加上display:flex;默认子元素不换行&#xff0c;如果装不开&#xff0c;子元素会缩小元素的宽度。 父级元…

前端开发常见的几种布局方式

作为前端开发工程师&#xff0c;布局方式有多种&#xff0c;对于不同的场景可以使用不同的布局方式&#xff0c;那么我们就先来简单的来了解一下&#xff0c;那些前端开发必须了解的布局。 一. 静态布局&#xff08;static layout&#xff09; 即传统的Web布局&#xff0c;网页…

android系统启动流程之zygote(Native)启动分析

zygote有一部分运行在native,有一部分运行在java层&#xff0c;它是第一个进入java层的进程 zygote在启动时&#xff0c;在init.${ro.zygote}.rc脚本中&#xff0c;里面描述了zygote是如何被启动的&#xff0c; 当init进程解析到zygote.rc文件时&#xff0c;将根据解析出来的命…

No117.精选前端面试题,享受每天的挑战和学习

文章目录 断点续传怎么做的秒传怎么实现var let const 块级作用域ts Partial Omit 怎么实现的箭头函数有哪些限制箭头函数为什么不能作为构造函数promise常用apiMap和Object的区别vue怎么实现双向绑定 断点续传怎么做的 断点续传是指在文件下载或上传过程中&#xff0c;当连接…

【微信小程序遇到的坑】实现跨行跨列的表格

由于微信小程序组件中不带table标签&#xff0c;所以只能自己制作一个table表格&#xff0c;并且是实现跨行跨列的复杂表格。 直接上代码&#xff0c;即可预览效果 wxml <view class"table"><view class"tr tr_title">上午8:30-11:45</vi…

50.服务程序SERVICE_STATUS、SERVICE_STATUS_HANDLE、RegisterServiceCtrlHandler、SetServiceStatus、SERVICE_TABL

我得先总结一下步骤&#xff1a; 一、在main函数中的操作 1.先创建一个main主函数&#xff0c;在main主函数中创建创建一个服务程序入口函数列的结构体并将其初始化 机构体SERVICE_TABLE_ENTRY DispatchTable[2]; 假设定义的服务入口函数是(LPSERVICE_MAIN_FUNCTION)ServiceMa…