[flink 实时流基础]源算子和转换算子

文章目录

    • 1. 源算子 Source
        • 1. 从集合读
        • 2. 从文件读取
        • 3. 从 socket 读取
        • 4. 从 kafka 读取
        • 5. 从数据生成器读取数据
    • 2. 转换算子
        • 基本转换算子(map/ filter/ flatMap)


1. 源算子 Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));// 2. 直接填元素DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);source.print();env.execute();}
2. 从文件读取
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/world.txt")).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource").print();env.execute();}
3. 从 socket 读取
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 7777);source.print();env.execute();}

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setTopics("topic_1").setGroupId("atguigu").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
5. 从数据生成器读取数据
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>
 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}}, 10, // 自动生成的数字序列RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条Types.STRING // 返回类型);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();env.execute();}

2. 转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
image.png

基本转换算子(map/ filter/ flatMap)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
image.png
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
image.png
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
:::info
消费一个元素,可以产生0到多个元素。
:::
flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2906878.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

基于springboot+vue实现的校企合作项目管理系统

作者主页&#xff1a;Java码库 主营内容&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app等设计与开发。 收藏点赞不迷路 关注作者有好处 文末获取源码 技术选型 【后端】&#xff1a;Java 【框架】&#xff1a;spring…

远程过程调用-buttonrpc源码解析5-函数绑定

前面几节的文章里提到过“通过一定的技术手段&#xff0c;通知远程服务器调用某个函数来执行相应的功能”&#xff0c;本片文章分析服务端如何使用std::bind函数&#xff0c;统一函数调用形式。 1、先看一个简单的例子&#xff1a; 温馨提示&#xff1a;std::bind除了绑定普通…

职场沟通教训 程序汪改了一行代码,导致测试和开发大战

本文章有视频的&#xff0c;请到B站 我是程序汪 观看 程序汪改了一行代码&#xff0c;导致测试和开发大战&#xff0c;职场沟通教训 程序汪改了一行代码&#xff0c;导致测试和开发大战 鸡汤文 每个人都会在沟通上出问题 工作上沟通出问题可能让你郁闷一天、丢了客户、损失金…

Termius for Mac/Win:多协议远程管理利器,你的工作效率提升神器

在数字化飞速发展的今天&#xff0c;远程管理已成为企业运营和个人工作不可或缺的一部分。而Termius&#xff0c;作为一款多协议远程管理软件&#xff0c;正以其卓越的性能和便捷的操作&#xff0c;成为广大用户的心头好。 Termius支持多种协议&#xff0c;无论是SSH、RDP还是…

计数器的原理和应用

一、计数器的原理和应用 要求&#xff1a;每计数三次&#xff0c;数码管值加一 #include<reg51.h> unsigned char s[]{0x3F,0x06,0x5B,0x4F,0x66,0x6D,0x7D,0x07,0x7F,0x6F}; unsigned char num0; void initcounter() {TMOD0x06;//0000 0110TH0256-3;TL0256-3;ET01;EA1;T…

函数指针的运用

这段代码使用了函数指针&#xff0c;实现了根据用户输入的命令选择不同的操作&#xff0c;并对两个数进行相应的处理。以下是代码的总结&#xff1a; getMax, getSmall 和 getSum 函数分别用于获取两个数中的较大值、较小值和它们的和。 dataHandler 函数接收两个数据 data 和…

unity学习(77)--多玩家信息交互--不同类型的数据包

明白各个数据包的作用&#xff0c;以及是否正确的发挥作用 1.“120包”&#xff0c;客户端登录时发给服务器的&#xff0c;服务器处理后返回“121包”。 2.“121包” &#xff0c;服务器返回给客户端的&#xff0c;包含登录时所有在线玩家的信息。 客户端也通过createPlayer函…

王道C语言督学营OJ课后习题(课时14)

#include <stdio.h> #include <stdlib.h>typedef char BiElemType; typedef struct BiTNode{BiElemType c;//c 就是书籍上的 datastruct BiTNode *lchild;struct BiTNode *rchild; }BiTNode,*BiTree;//tag 结构体是辅助队列使用的 typedef struct tag{BiTree p;//树…

Machine Learning机器学习之数据可视化

目录 前言 一、 数据预处理与清洗 二、常见可视化技术 三、可视化工具和平台 博主介绍&#xff1a;✌专注于前后端、机器学习、人工智能应用领域开发的优质创作者、秉着互联网精神开源贡献精神&#xff0c;答疑解惑、坚持优质作品共享。本人是掘金/腾讯云/阿里云等平台优质作者…

【娱乐】战双帕弥什游戏笔记攻略

文章目录 Part.I IntroductionChap.I Information Part.II 新手攻略Chap.I 角色和武器挑选Chap.II 新手意识推荐 Part.II 阵容搭配Chap.I 一拖二Chap.II 毕业队 Reference Part.I Introduction 2019年12月5日全平台公测。 偶然间入坑战双&#xff0c;玩了几天&#xff0c;觉得…

unity小:使用Unity FBX Exporter 将 3DMax场景或者模型无损导入Unity

本指南旨在帮助您顺利安装和配置Unity FBX Exporter插件&#xff0c;并解决相关的常见问题。 安装 FBX Exporter 下载并安装FBX Exporter插件。 打开Unity&#xff0c;选择 Edit > Project Settings > Fbx Export。 点击 Install Unity Integration 并选择3ds Max的插…

2020年30米二级分类北京市土地利用数据

引言 北京市省土地利用数据产品是指基于Landsat TM/ETM/OLI遥感影像&#xff0c;采用遥感信息提取方法&#xff0c;并结合野外实测&#xff0c;以及参照国内外现有的土地利用/土地覆盖分类体系&#xff0c;经过波段选择及融合&#xff0c;图像几何校正及配准并对图像进行增强处…

代码随想录算法训练营 DAY 24 | 回溯理论基础 77.组合 + 剪枝优化

回溯理论 回溯法就是递归函数&#xff0c;纯暴力搜索 解决的问题 组合&#xff08;无顺序&#xff09; 1 2 3 4 给出大小为2的所有组合 切割字符串 子集问题 1 2 3 4&#xff0c;子集有1 2 3 4,12,13,14&#xff0c;…123 124… 排列&#xff08;有顺序&#xff09; 棋盘…

平台产品线 | 高频问题更新(2024.3.25)

平台产品线 | 高频问题更新(2024.3.25) 一、SuperMap iServer 问题1&#xff1a;请教一个问题&#xff0c;我们项目上iServer启动不了&#xff0c;日志报错是许可问题吗&#xff1f;我们刚刚更新的许可&#xff1f; 11.1.1 【问题原因】SQLITE BUSY The database file is l…

consul集群部署三server一client

环境&#xff1a; consul&#xff1a;consul_1.16.2_linux_amd64.zip centos7.9 server:192.168.50.154 192.168.50.155 192.168.50.156 client:192.168.70.64 安装目录&#xff1a; [rootrabbit4-64 consul]# pwd /app/consul [rootrabbit4-64 consul]# ls consul consul_1…

兆欧表揭秘:到底是摇表还是电器?

兆欧表&#xff0c;又称摇表&#xff0c;是一种用于测量电气设备、电缆、电机绕组等绝缘电阻的测试工具。虽然现代兆欧表采用电动型和电池供电等多种形式&#xff0c;但其基本功能和用途保持一致。早期的兆欧表多采用手动机械式设计&#xff0c;通过手柄摇动发电来提供所需的高…

YOLOv9改进策略 :block优化 | MobileViTAttention自注意力,更小、更轻、精度更高 ,性能优于MobileNetV3等

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文改进内容&#xff1a;现有博客都是将MobileViT作为backbone引入YOLO&#xff0c;因此存在的问题点是训练显存要求巨大&#xff0c;因此本文引入自注意力(ViTs)&#xff1a;MobileViTAttention&#xff0c;从而实现高效涨点 &#…

岭师大数据技术原理与应用-序章-软工版

HeZaoCha-CSDN博客 序章—软工版 一、环境介绍1. VMware Workstation Pro2. CentOS3. Java4. Hadoop5. HBase6. MySQL7. Hive 二、系统安装1. 虚拟网络编辑器2. 操作系统安装 三、结尾 先说说哥们写这系列博客的原因&#xff0c;本来学完咱也没想着再管部署这部分问题的说&…

HarmonyOS实战开发-实现自定义弹窗

介绍 本篇Codelab基于ArkTS的声明式开发范式实现了三种不同的弹窗&#xff0c;第一种直接使用公共组件&#xff0c;后两种使用CustomDialogController实现自定义弹窗&#xff0c;效果如图所示 相关概念 AlertDialog&#xff1a;警告弹窗&#xff0c;可设置文本内容和响应回调…

C语言查找-----------BF算法KMP算法

1.问题引入 有一个主字符串&#xff0c;有一个子字符串&#xff0c;要求我们寻找子字符串在主字符串里面开始出现的位置&#xff1b; 2.BF算法 BF算法就是暴力算法&#xff0c;这个做法虽然效率不高&#xff0c;但是按照我们传统的思路依然能够得到结果&#xff0c;接下来我们…