Flink从入门到实践(一):Flink入门、Flink部署

文章目录

  • 系列文章索引
  • 一、快速上手
    • 1、导包
    • 2、求词频demo
      • (1)要读取的数据
      • (2)demo1:批处理(离线处理)
      • (3)demo2 - lambda优化:批处理(离线处理)
      • (4)demo3:流处理(实时处理)
      • (5)总结:实时vs离线
      • (6)demo4:批流一体
      • (7)对接Socket
  • 二、Flink部署
    • 1、Flink架构
    • 2、Standalone部署
    • 3、自运行flink-web
    • 4、通过参数传递
    • 5、通过webui提交job
    • 6、停止作业
    • 7、常用命令
    • 8、集群
  • 参考资料

系列文章索引

Flink从入门到实践(一):Flink入门、Flink部署
Flink从入门到实践(二):Flink DataStream API
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC

一、快速上手

1、导包

<!-- fink 相关依赖 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version>
</dependency>

2、求词频demo

注意!自Flink 1.18以来,所有Flink DataSet api都已弃用,并将在未来的Flink主版本中删除。您仍然可以在DataSet中构建应用程序,但是您应该转向DataStream和/或Table API。

(1)要读取的数据

定义data内容:

pk,pk,pk
ruoze,ruoze
hello

(2)demo1:批处理(离线处理)

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** 使用Flink进行批处理,并统计wc*** 结果:* (bye,2)* (hello,3)* (hi,1)*/
public class BatchWordCountApp {public static void main(String[] args) throws Exception {// step0: Spark中有上下文,Flink中也有上下文,MR中也有ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// step1: 读取文件内容  ==> 一行一行的字符串而已DataSource<String> source = env.readTextFile("data/wc.data");// step2: 每一行的内容按照指定的分隔符进行拆分  1:Nsource.flatMap(new FlatMapFunction<String, String>() {/**** @param value 读取到的每一行数据* @param out 输出的集合*/@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {// 使用,进行分割String[] splits = value.split(",");for(String split : splits) {out.collect(split.toLowerCase().trim());}}}).map(new MapFunction<String, Tuple2<String,Integer>>() {/**** @param value 每一个元素 (hello, 1)(hello, 1)(hello, 1)*/@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}).groupBy(0)  // step4: 按照单词进行分组  groupBy是离线的api,传下标.sum(1)  // ==> 求词频 sum,传下标.print(); // 打印}
}

(3)demo2 - lambda优化:批处理(离线处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** lambda表达式优化*/
public class BatchWordCountAppV2 {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> source = env.readTextFile("data/wc.data");/*** lambda语法: (参数1,参数2,参数3...) -> {函数体}*/
//        source.map(String::toUpperCase).print();// 使用了Java泛型,由于泛型擦除的原因,需要显示的声明类型信息source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).groupBy(0).sum(1).print();}
}

(4)demo3:流处理(实时处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 流式处理* 结果:* 8> (hi,1)* 6> (hello,1)* 5> (bye,1)* 6> (hello,2)* 6> (hello,3)* 5> (bye,2)*/
public class StreamWCApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.readTextFile("data/wc.data");source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x -> x.f0) // 这种写法一定要掌握!流式的并没有groupBy,而是keyBy!根据第一个值进行sum.sum(1).print();// 需要手动开启env.execute("作业名字");}
}

(5)总结:实时vs离线

离线:结果是一次性出来的。
实时:来一个数据处理一次,数据是带状态的。

(6)demo4:批流一体

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 采用批流一体的方式进行处理*/
public class FlinkWordCountApp {public static void main(String[] args) throws Exception {// 统一使用StreamExecutionEnvironment这个执行上下文环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 选择处理方式 批/流/自动DataStreamSource<String> source = env.readTextFile("data/wc.data");source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x -> x.f0) // 这种写法一定要掌握.sum(1).print();// 执行env.execute("作业名字");}
}

(7)对接Socket

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 使用Flink对接Socket的数据并进行词频统计** 大数据处理的三段论: 输入  处理  输出**/
public class FlinkSocket {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 数据源:可以通过多种不同的数据源接入数据:socket  kafka  text** 官网上描述的是 env.addSource(...)** socket的方式对应的并行度是1,因为它来自于SourceFunction的实现*/DataStreamSource<String> source = env.socketTextStream("localhost", 9527);System.out.println(source.getParallelism());// 处理source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x -> x.f0) // 这种写法一定要掌握.sum(1)// 数据输出.print();  // 输出到外部系统中去env.execute("作业名字");}
}

二、Flink部署

1、Flink架构

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/
Flink是一个分布式的带有状态管理的计算框架,可以运行在常用/常见的集群资源管理器上(YARN、K8S)。

一个JobManager(协调/分配),一个或多个TaskManager(工作)。
在这里插入图片描述
在这里插入图片描述

2、Standalone部署

按照官网下载执行即可:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation/

可以根据官网来安装,需要下载、解压、安装。
也可以使用docker安装。

启动之后,localhost:8081就可以访问管控台了。

3、自运行flink-web

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>1.18.0</version>
</dependency>
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8082); // 指定web端口,开启webUI,不写的话默认8081
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 新版本可以直接使用getExecutionEnvironment(conf)

以上亲测并不好使……具体原因未知,设置为flink1.16版本或许就好用了。

4、通过参数传递

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 通过参数传递进来Flink引用程序所需要的参数,flink自带的工具类
ParameterTool tool = ParameterTool.fromArgs(args);
String host = tool.get("host");
int port = tool.getInt("port");DataStreamSource<String> source = env.socketTextStream(host, port);
System.out.println(source.getParallelism());

可以通过命令行参数:–host localhost --port 8765

5、通过webui提交job

在这里插入图片描述
在这里插入图片描述

6、停止作业

在这里插入图片描述

7、常用命令

# 查看作业列表
flink list -a  # 所有
flink list -r  # 正在运行的
# 停止作业
flink cancel <jobid># 提交job
# -c,--class <classname> 指定main方法
# -C,--classpath <url> 指定classpath
# -p,--parallelism <paralle> 指定并行度
flink run -c com.demo.FlinkDemo FlinkTest.jar 

8、集群

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/#flink-application-execution

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/

单机部署Session Mode和Application Mode:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/standalone/overview/

k8s:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/native_kubernetes/

YARN:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/yarn/

参考资料

https://flink.apache.org/
https://nightlies.apache.org/flink/flink-docs-stable/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2775574.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

python视频播放列表信息库之m3u8使用详解

m3u8库是什么&#xff1f; m3u8是一个用于解析和操作M3U8文件的Python库。M3U8文件&#xff0c;是指使用UTF-8编码格式的M3U文件&#xff0c;它们通常用于播放列表文件&#xff0c;尤其是在HTTP Live Streaming&#xff08;HLS&#xff09;中。简单来说&#xff0c;m3u8库能帮…

HiveSQL——条件判断语句嵌套windows子句的应用

注&#xff1a;参考文章&#xff1a; SQL条件判断语句嵌套window子句的应用【易错点】--HiveSql面试题25_sql剁成嵌套判断-CSDN博客文章浏览阅读920次&#xff0c;点赞4次&#xff0c;收藏4次。0 需求分析需求&#xff1a;表如下user_idgood_namegoods_typerk1hadoop1011hive1…

假期刷题打卡--Day27

1、MT1217矩阵乘法 输入3X4整型矩阵A和4X3的整型矩阵B&#xff0c;计算A*B&#xff0c;放到矩阵C里面&#xff0c;输出矩阵C。 格式 输入格式&#xff1a; 分两行输入两个矩阵&#xff0c;空格分隔。 输出格式&#xff1a; 按矩阵形式输出&#xff0c;整型&#xff0c;每…

[算法前沿]--059-大语言模型Fine-tuning踩坑经验之谈

前言 由于 ChatGPT 和 GPT4 兴起,如何让人人都用上这种大模型,是目前 AI 领域最活跃的事情。当下开源的 LLM(Large language model)非常多,可谓是百模大战。面对诸多开源本地模型,根据自己的需求,选择适合自己的基座模型和参数量很重要。选择完后需要对训练数据进行预处…

基于微信小程序的校园二手交易平台

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

双指针算法 判断子序列

判断子序列 1.C代码实现: #include<iostream> using namespace std; const int N100002; int a[N],b[N];int main(){int n,m;cin>>n>>m; // 输入n和m的值for(int i0;i<n;i){cin>>a[i]; // 输入数组a的元素}for(int j0;j<m;j){cin>>b[j]; …

MYSQL存储过程(含入参、出参)

1、创建库存表语句 -- eladmin.t_stock definitionCREATE TABLE t_stock (id bigint(20) NOT NULL AUTO_INCREMENT,quantity bigint(20) NOT NULL,PRIMARY KEY (id) ) ENGINEInnoDB AUTO_INCREMENT4101 DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_bin; id为主键&#xff0c;便于…

CTFshow web(php命令执行 45-49)

基础知识&#xff1a; 1.绕过cat使用&#xff1a; tac more less head tac tail nl od(二进制查看) vi vim sort uniq rev 2.绕过空格用&#xff1a; %09 <> ${IFS} $IFS$ {cat,fl*} %20 注&#xff1a; %09 ##&#xff08;Tab&#xff09; %20 ##&#xff08;spa…

项目02《游戏-12-开发》Unity3D

基于 项目02《游戏-11-开发》Unity3D &#xff0c; 任务&#xff1a;实现场景怪物自动巡航 &#xff0c; 首先在场景中创建小球命名为路径点WayPoint0&#xff0c; 取消小球的碰撞器Collider&#xff0c; 再复制两个改名为WayPoint1 和 WayPoint2 &#xff0c; 在…

Spring Boot 笔记 003 Bean注册

使用Idea导入第三方jar包 在porn.xml种添加的第三方jar包依赖&#xff0c;并刷新 可以在启动类中尝试调用 以上放到启动类中&#xff0c;不推荐&#xff0c;建议创建一个专门定义的类 package com.geji.config;import cn.itcast.pojo.Country; import cn.itcast.pojo.Province;…

树与二叉树---数据结构

树作为一种逻辑结构&#xff0c;同时也是一种分层结构&#xff0c;具有以下两个特点&#xff1a; 1&#xff09;树的根结点没有前驱&#xff0c;除根结点外的所有结点有 且只有一个前驱。 2&#xff09;树中所有结点可以有零个或多个后继。 树结点数据结构 满二叉树和完全二…

Spinnaker多云持续交付平台: 部署Minio存储服务

目录 一、实验 1.环境 2.K8S storage节点部署NFS 3.K8S 动态创建PV 4.K8S master节点部署HELM3 4.K8S master节点部署Minio存储服务&#xff08;第一种方式安装&#xff09; 5.Minio客户端安装MC命令 6.K8S master节点使用Docker 部署Minio存储服务&#xff08;第二种方…

云游戏发行需要哪些条件

云游戏是一种创新性的游戏服务模式&#xff0c;将游戏运算和渲染等处理任务移至云端服务器&#xff0c;通过互联网实时传输画面和操作指令&#xff0c;使玩家能够在低端终端设备上也能流畅玩游戏。要做云游戏发行&#xff0c;需要考虑一系列条件&#xff0c;包括技术、基础设施…

幻兽帕鲁服务器创建私服教程(新版教程更简单)

幻兽帕鲁官方服务器不稳定&#xff1f;自己搭建幻兽帕鲁服务器&#xff0c;低延迟、稳定不卡&#xff0c;目前阿里云和腾讯云均推出幻兽帕鲁专用服务器&#xff0c;腾讯云直接提供幻兽帕鲁镜像系统&#xff0c;阿里云通过计算巢服务&#xff0c;均可以一键部署&#xff0c;鼠标…

表单标记(html)

前言 发现input的type属性还是有挺多的&#xff0c;这里把一些常用的总结一下。 HTML 输入类型 (w3school.com.cn)https://www.w3school.com.cn/html/html_form_input_types.asp text-文本 文本输入,如果文字太长&#xff0c;超出的部分就不会显示。 定义供文本输入的单行…

项目学习记录

项目开发 创建项目环境配置关联git新增模块项目启动打印地址日志使用httpclient进行idea内部控制台测试使用AOP拦截器打印日志 创建项目 创建一个空项目&#xff0c;并勾选下面选项 然后进入pom.xml中修改项目配置 根据这个链接选则&#xff0c;修改项目的支持版本 链接&#…

《MySQL 简易速速上手小册》第5章:高可用性和灾难恢复(2024 最新版)

文章目录 5.1 构建高可用性 MySQL 解决方案5.1.1 基础知识5.1.2 重点案例&#xff1a;使用 Python 构建高可用性的电子商务平台数据库5.1.3 拓展案例 5.2 数据备份策略和工具5.2.1 基础知识5.2.2 重点案例&#xff1a;使用 Python 实现 MySQL 定期备份5.2.3 拓展案例 5.3 灾难恢…

Mybatis是否支持延迟加载?

前言 随着互联网应用的不断发展&#xff0c;数据库访问成为了应用开发中的一个重要环节。在这个背景下&#xff0c;MyBatis作为一种优秀的持久层框架&#xff0c;提供了灵活的SQL映射配置和强大的功能&#xff0c;为开发者提供了便捷的数据库访问解决方案。本文将深入探讨MyBat…

宋小黑原创高清壁纸分享之蓝白云海

大家好&#xff0c;我是小黑&#xff0c;最近迷上了制作壁纸&#xff0c;哈哈&#xff0c;给大家分享一波&#xff0c;小黑做的美图~ 本期给大家分享的是&#xff0c;小黑原创的蓝白云海主题系统壁纸~ 厌倦了一成不变的壁纸吗&#xff1f; 感到学习负担过重吗&#xff1f; …

OpenCV-33 开运算和闭运算

目录 一、开运算 二、闭运算 三、形态学梯度 开运算和闭运算都是腐蚀和膨胀的基本应用。 一、开运算 开运算 腐蚀膨胀(腐蚀之后再膨胀) 开运算提供了另一种去除噪声的思路。&#xff08;腐蚀先进行去噪&#xff0c;膨胀再还原图像&#xff09; 通过API --- morphologyE…