Flink on k8s之historyServer

1.Flink HistoryServer用途
HistoryServer可以在Flink 作业终止运行(Flink集群关闭)之后,还可以查询已完成作业的统计信息。此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。

2.部署Flink HistoryServer
1、创建 flink historyserver pvc,保存Flink作业归档数据。

[root@k8s-demo001 ~]# cat flink-historyserver-pvc.yaml
#Flink Historyserver 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:name: flink-historyserver-pvc  # historyserver pvc名称namespace: flink   # 指定归属的名命空间
spec:storageClassName: nfs-storage   #sc名称,更改为实际的sc名称accessModes:- ReadWriteMany   #采用ReadWriteMany的访问模式resources:requests:storage: 1Gi    #存储容量,根据实际需要更改
[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver-pvc.yaml

2、配置flink historyserver,创建flink historyserver configmap

[root@k8s-demo001 ~]# cat flink-historyserver-conf.yaml
kind: ConfigMap
apiVersion: v1
metadata:name: flink-historyserver-confnamespace: flinkannotations:kubesphere.io/creator: admin
data:flink-conf.yaml: |blob.server.port: 6124kubernetes.jobmanager.annotations: flinkdeployment.flink.apache.org/generation:2kubernetes.jobmanager.replicas: 1kubernetes.jobmanager.cpu: 1.0$internal.flink.version: v1_13kubernetes.taskmanager.cpu: 1.0jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122kubernetes.service-account: flinkkubernetes.cluster-id: flink-historyserverkubernetes.container.image: flink-hdfs:1.13.6parallelism.default: 2kubernetes.namespace: flinktaskmanager.numberOfTaskSlots: 2kubernetes.rest-service.exposed.type: ClusterIPkubernetes.operator.reconcile.interval: 15 skubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTEkubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactoryjobmanager.memory.process.size: 1024mtaskmanager.memory.process.size: 1024mkubernetes.internal.jobmanager.entrypoint.class: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypointkubernetes.pod-template-file: /tmp/flink_op_generated_podTemplate_17272077926352838674.yamlexecution.target: kubernetes-sessionjobmanager.archive.fs.dir: file:///opt/flink/flink_historyhistoryserver.archive.fs.dir: file:///opt/flink/flink_historyhistoryserver.archive.fs.refresh-interval: 10000historyserver.web.port: 8082web.tmpdir: /opt/flink/webuploadweb.upload.dir: /opt/flink/webuploadweb.cancel.enable: falseinternal.cluster.execution-mode: NORMALqueryable-state.proxy.ports: 6125state.checkpoints.dir: file:///opt/flink/checkpointslog4j.properties: |# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.monitorInterval=30# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.file.ref = MainAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFOlogger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3logger.shaded_zookeeper.level = INFO# Log all infos in the given fileappender.main.name = MainAppenderappender.main.type = RollingFileappender.main.append = trueappender.main.fileName = ${sys:log.file}appender.main.filePattern = ${sys:log.file}.%iappender.main.layout.type = PatternLayoutappender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.main.policies.type = Policiesappender.main.policies.size.type = SizeBasedTriggeringPolicyappender.main.policies.size.size = 100MBappender.main.policies.startup.type = OnStartupTriggeringPolicyappender.main.strategy.type = DefaultRolloverStrategyappender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFFlog4j-console.properties: |# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF# Flink Deployment Logging Overrides# rootLogger.level = DEBUG
[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver-conf.yaml

检查
在这里插入图片描述
3、创建Historyserver服务

[root@k8s-demo001 ~]# cat flink-historyserver.yaml
apiVersion: apps/v1
kind: Deployment
metadata:namespace: flinklabels:app: flink-historyservername: flink-historyservername: flink-historyserver
spec:replicas: 1selector:matchLabels:name: flink-historyservertemplate:metadata:namespace: flinklabels:app: flink-historyservername: flink-historyserverspec:hostAliases:  # hosts配置- ip: "172.16.252.129"hostnames:- "Kafka-01"- ip: "172.16.252.130"hostnames:- "Kafka-02"- ip: "172.16.252.131"hostnames:- "Kafka-03"containers:- name: flink-historyserverenv:- name: TZvalue: Asia/Shanghaiimage: flink:1.13.6command: [ 'sh','-c','/docker-entrypoint.sh history-server' ]ports:- containerPort: 8082volumeMounts:- name: flink-historyserver-confmountPath: /opt/flink/conf/flink-conf.yamlsubPath: flink-conf.yaml- name: flink-historyserver-confmountPath: /opt/flink/conf/log4j.propertiessubPath: log4j.properties- name: flink-historyserver-confmountPath: /opt/flink/conf/log4j-console.propertiessubPath: log4j-console.properties- name: flink-historyservermountPath: /opt/flink/flink_historyvolumes:  # 挂载卷配置- name: flink-historyserver-confconfigMap:name: flink-historyserver-conf- name: flink-historyserverpersistentVolumeClaim:claimName: flink-historyserver-pvc
# ---
# kind: Service
# apiVersion: v1
# metadata:
#   namespace: flink
#   name: flink-historyserver
# spec:
#   type: NodePort
#   ports:
#     - port: 8082
#       nodePort: 31082
#   selector:
#     name: flink-historyserver# ingress按实际情况配置
---
apiVersion: v1
kind: Service
metadata:labels:app: flink-historyservername: flink-historyservername: flink-historyservernamespace: flink
spec:selector:app: flink-historyserverports:- port: 8082protocol: TCPtargetPort: 8082
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:namespace: flinkname: flink-historyserverannotations:nginx.ingress.kubernetes.io/default-backend: ingress-nginx-controllernginx.ingress.kubernetes.io/use-regex: 'true'
spec:ingressClassName: nginxrules:- host: "flink-hs.k8s.io"http:paths:- pathType: Prefixpath: "/"backend:service:name: flink-historyserverport:number: 8082
[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver.yaml

验证:
在这里插入图片描述
访问Flink UI:
http://flink-hs.k8s.io/
在这里插入图片描述
3.提交flink作业
1、编写提交作业的yaml

这里需要挂在Historyserver的pvc,并配置Historyserver的归档路径到pvc挂载路径

[root@k8s-demo001 ~]# cat application-deployment-checkpoint-ha-hs.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:namespace: flinkname: application-deployment-checkpoint-ha-hs  # flink 集群名称
spec:image: flink:1.13.6  # flink基础镜像flinkVersion: v1_13  # flink版本,选择1.13imagePullPolicy: IfNotPresent  # 镜像拉取策略,本地没有则从仓库拉取ingress:   # ingress配置,用于访问flink web页面template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"className: "nginx"annotations:nginx.ingress.kubernetes.io/rewrite-target: "/$2"flinkConfiguration:taskmanager.numberOfTaskSlots: "2"state.checkpoints.dir: file:///opt/flink/checkpointshigh-availability.type: kuberneteshigh-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory # JobManager HAhigh-availability.storageDir: file:///opt/flink/flink_recovery  # JobManager HA数据保存路径jobmanager.archive.fs.dir: file:///opt/flink/flink_history      # JobManager 归档路径historyserver.archive.fs.dir: file:///opt/flink/flink_history      # Historyserver 归档路径historyserver.archive.fs.refresh-interval: "10000"              # Historyserver 文件刷新间隔serviceAccount: flinkjobManager:replicas: 2  # HA下, jobManger的副本数要大于1resource:memory: "1024m"cpu: 1taskManager:resource:memory: "1024m"cpu: 1podTemplate:spec:hostAliases:- ip: "172.16.252.129"hostnames:- "Kafka-01"- ip: "172.16.252.130"hostnames:- "Kafka-02"- ip: "172.16.252.131"hostnames:- "Kafka-03"containers:- name: flink-main-containerenv:- name: TZvalue: Asia/ShanghaivolumeMounts:- name: flink-jar  # 挂载nfs上的jarmountPath: /opt/flink/jar- name: flink-checkpoints  # 挂载checkpoint pvcmountPath: /opt/flink/checkpoints- name: flink-log  # 挂载日志 pvcmountPath: /opt/flink/log- name: flink-ha    # HA pvc配置mountPath: /opt/flink/flink_recovery- name: flink-historyservermountPath: /opt/flink/flink_historyvolumes:- name: flink-jarpersistentVolumeClaim:claimName: flink-jar-pvc- name: flink-checkpointspersistentVolumeClaim:claimName: flink-checkpoint-application-pvc- name: flink-logpersistentVolumeClaim:claimName: flink-log-pvc- name: flink-hapersistentVolumeClaim:claimName: flink-ha-pvc- name: flink-historyserverpersistentVolumeClaim:claimName: flink-historyserver-pvcjob:jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包entryClass: org.fblinux.StreamWordCountWithCPargs:   # 传递到作业main方法的参数- "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092"- "flink_test"- "172.16.252.113"- "3306"- "flink_test"- "wc"- "file:///opt/flink/checkpoints"- "10000"- "1"parallelism: 1upgradeMode: stateless
[root@k8s-demo001 ~]# kubectl apply -f application-deployment-checkpoint-ha-hs.yaml

作业提交之后,可以手动往Kafka 写入一些数据,然后关闭作业
作业运行中historyserver是没有信息的,作业终止后history service才会查询到相关信息

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2778348.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

音视频/流媒体协议和编码汇总

一、流媒体协议 1. RTMP/RTMPT/RTMPS/RTMPE 等多变种 是应用层协议,使用TCP作为底层传输协议,并提供了低延迟、高带宽利用率和实时性的特点。 (1)RTMP协议是Adobe的私有协议,未完全公开 (2)一般传输的是 flv,f4v 格式流 2. RTP/RTCP/SRTP …

基础面试题整理7之Redis

1.redis持久化RDB、AOF RDB(Redis database) 在当前redis目录下生成一个dump.rdb文件,对redis数据进行备份 常用save、bgsave命令进行数据备份: save命令会阻塞其他redis命令,不会消耗额外的内存,与IO线程同步;bgsav…

云卷云舒:论超级数据库、算网数据库、智算数据库

笔者大胆提出一种“超级数据库”的概念设想。 一、超级能力 就像当初提出“超级计算机”一样,我们是否同样可以提出“超级数据库”的概念呢?当然不是不可以。 二、超级计算机 我们回忆一下“超级计算机”的发展之路,大致经过了如下几个环…

C++入门篇(4)—— 类与对象(1)

目录 1.类的引入 2.类的定义 3.类的访问限定符 4.类的作用域 5. 类对象的存储方式 6. this指针 6.1 this指针的引入 6.2 this指针的特性 6.3有意思的面试题 1.类的引入 C语言struct 结构体中只能定义变量,而C中可以定义函数。 struct Date {void Init(int…

VitePress-14- 配置-titleTemplate 的作用详解

作用描述 1、titleTemplate 是标题的后缀&#xff1b;2、可以自定义标题的后缀&#xff1b;3、可以自定义整个的标题以及后缀&#xff0c;语法如下&#xff1a; titleTemplate: :title 链接符号 自己定义的后缀 【:title】&#xff1a;从页面的第一个 <h1> 标题推断出的…

记录一次centos 使用selenium运行环境

这里写自定义目录标题 宝塔面板 安装 selenium安装google-chrome 宝塔面板 安装 selenium 安装google-chrome yum install https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm 查看chrome版本 google-chrome --version 下载对应chrome版本的chro…

python实现基数排序

如果在给不同的整形数组排序的时候,一般会这样做,也就是先看最高位,如果最高位数值大的话也就意味着它的数值是最大的,而如果两个数字的最高位的数值是一样的,则继续比较次高位,这样依次去比较可以决定数字的排序。而对于基数排序来说,其思想是与以上的思想是不同的,基…

开源!免费!Hugging Face推出GPT商城

Hugging Face发布开源AI助手制造工具&#xff0c;与OpenAI的定制GPT形成竞争 Hugging Face今年1月31日推出一款开源AI代码库——Hugging Chat Assistants&#xff0c;允许用户轻松创建特定功能的定制AI聊天机器人。 不同于OpenAI的ChatGPT商城需要每月20美金成为会员才能使用…

【MySQL】字符串函数的学习

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-J7VN4RbrBi51ozap {font-family:"trebuchet ms",verdana,arial,sans-serif;font-siz…

【Larry】英语学习笔记语法篇——从句=连词+简单句

目录 三、从句连词简单句 1、必须有连词 主从结构 疑问词的词性 2、名词性从句 同位语从句 形式主语 形式宾语 that的省略 3、形容词性从句&#xff08;上&#xff09; 关系代词 关系词的作用 介词前置问题 4、形容词性从句&#xff08;中&#xff09; 定语关系…

visual studio和cmake如何编译dlib库

官网 dlib C Library 对应的是最新版本&#xff0c;只能用到vs2015版本及以后 如果使用vs2013&#xff0c;所以需要下载vs2013可用的版本。 就是说dlib版本与vs版本有对应关系 所有版本 dlib C Library - Browse /dlib at SourceForge.net Releases davisking/dlib GitHu…

如何在Linux上部署1Panel运维管理面板并实现无公网ip远程访问

文章目录 前言1. Linux 安装1Panel2. 安装cpolar内网穿透3. 配置1Panel公网访问地址4. 公网远程访问1Panel管理界面5. 固定1Panel公网地址 前言 1Panel 是一个现代化、开源的 Linux 服务器运维管理面板。高效管理,通过 Web 端轻松管理 Linux 服务器&#xff0c;包括主机监控、…

数据结构——5.5 树与二叉树的应用

5.5 树与二叉树的应用 概念 结点的权&#xff1a;大小可以表示结点的重要性 结点的带权路径长度&#xff1a;从树的根到该结&#xff0c;的路径长度&#xff08;经过的边数&#xff09;与该结点权的乘积 树的带权路径长度&#xff1a;树中所有叶结点的带权路径长度之和(WPL) …

给定长度为n的01串s,有两种操作:1、交换相邻的两个字符,花费为1e12;2、删除一个字符,花费为1e12 + 1,求使s不递减的最少花费

题目 思路&#xff1a; #include <bits/stdc.h> using namespace std; #define int long long #define pb push_back #define fi first #define se second #define lson p << 1 #define rson p << 1 | 1 const int maxn 1e6 5, inf 1e12, maxm 4e4 5, …

反序列化漏洞——PHP原生类

Error类 PHP>7.0&#xff0c;因为存在__toString&#xff0c;可以进行XSS Exception类 因为存在__toString&#xff0c;可以进行XSS DirectoryIterator类 因为存在__toString&#xff0c;可以获取符合要求的第一个文件名 SplFileObject类 因为存在__toString&#xff0c…

Python 数据可视化之山脊线图 Ridgeline Plots

文章目录 一、前言二、主要内容三、总结 &#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 一、前言 JoyPy 是一个基于 matplotlib pandas 的单功能 Python 包&#xff0c;它的唯一目的是绘制山脊线图 Joyplots&#xff08;也称为 Ridgeline Plots&…

滑块验证码识别代码分享

平时我们开发爬虫会遇到各种各样的滑动验证码&#xff0c;如下图所示&#xff1a; 为了解决这个问题&#xff0c;我写了一个通用的滑块验证码识别代码&#xff0c;主要是分析图片&#xff0c;然后计算出滑块滑动的像素距离。但是像素距离大多数情况下都不会等于滑动距离&#x…

记:STM32F4参考手册-存储器和总线架构

STM32F4参考手册-存储器和总线架构 目录 STM32F4参考手册-存储器和总线架构 系统架构 AHB/APB总线桥&#xff08;APB&#xff09; 存储器组织结构 存储器映射 SRAM概述 Flash概述 位段 自举配置 嵌入式自举程序 物理重映射 系统架构 主系统由32位多层AHB总线矩阵构…

cximage在vs2013下使用方法

1.下载源码 Cximage源码官网 CxImage download | SourceForge.net 下载最新版本 702版本 Download cximage702_full.7z (CxImage) 2.编译 vs2013打开CxImageFull_vc10.sln 这个源码版本是vc10的版本&#xff0c;所以vs2013会自动更新项目 因为cximage需要在后面的项目中使…

零基础学python之高级编程(1)---面向对象编程及其类的创建

面向对象编程及其类的创建 文章目录 面向对象编程及其类的创建前言一、面向过程编程和面向对象编程的概念1.面向过程编程(Procedural Programming)2.面向对象编程(Object-Oriented Programming&#xff0c;OOP) 二、面向对象编程基础1.初识类(class)和对象调用方法 2.类中的两种…