RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警

心得的体会

刚过了年刚开工,闲暇之余调研了分布式SQL流处理数据库–RisingWave,本人是Flink(包括FlinkSQL和Flink DataStream API)的资深用户,但接触到RisingWave令我眼前一亮,并且拿我们生产上的监控告警场景在RisingWave上做了验证,以下是自己的心得体会:

RisingWave架构简单,运维成本底,基于云原生(可以分别基于计算和存储动态伸缩),同时在开发上屏蔽了Flink等实时处理框架底层需要处理的一些技术细节(状态存储,数据一致性,分布式集群扩展等)。提供了与PostgreSQL兼容的标准SQL接口,用户可以像使用 PostgreSQL 一样处理数据流。并且RisingWave不单单可以处理流式数据,还提供了数据其他流式处理框架(如:Flink、storm)所不具备的数据存储能力,基本可以完全取代FlinkSQL。相对于其他OLAP系统(如:apache doris,starrocks),RisingWave采用同步实时,可以保证实时的新鲜度;强一致性,而不是最终一致性。用户需要做的仅仅是通过开发SQL就可以处理流数据,当然首先需要具备流式数据处理思维(相对于离线)。

RisingWave当然也有自身的不足,相对于Flink可以通过DataStream API自定义灵活的处理流式数据,RisingWave只能解决一些特定的流式场景,无法做太多定制开发;相对于Apache Doris等OLAP实时分析性数据库,RisingWave不适合做分析型随机查询。另外RisingWave是个新事物,正在发展阶段,周边生态和相关文档还不健全,作为尝鲜者可能会踩很多坑。然而令人欣慰的是RisingWave的社区回复还是很及时的,RisingWave官方投入了很多精力在做RisingWave的布道和答疑。

至于争论比较厉害的RisingWav VS FLink的性能和吞吐量上孰优孰劣,针对不同应用场景可能有不同表现,因此没有亲自调研就没有发言权。但我认为在不同的场景下他们应该有各自的优势。无论如何RisingWave部署简单,上手容易,试错成本低是一个不争的事实。RisingWave可以应用在一些数据看版,监控,实时指标等场景。

利用动态和时间过滤器实现监控告警

FlinkSQL解决不了定时触发的问题,FlinkSQL的流处理逻辑只是按event触发,不能按时间条件触发,也就是没有触发器机制。FlinkSQL窗口的定时触发,归根结底也是基于event触发,event驱动的机制。因此需要触发器的场景就需要用到Flink DataStream API的KeyedProcessFunction等算子。但RisingWave利用Dynamic filters 和 Temporal filters 可以间接实现类似场景的触发器机制。

场景描述

现有如下群消息实时指标监控场景:
数据有初始化(init)、查询(query)、回调(callback:succ+fail)三种先后顺序状态。
数据是按预设时间批次分组的,例如:2024-01-01 08:00:00、2024-01-01 08:30:00,实时统计每一个批次内三种不同状态的数据count。

监控指标一:在某一个批次延迟指定的时间(query_timeout)之内(例如:2024-01-01 08:00:00延迟1小时触发时间为系统时间2024-01-01 09:00:00),该批次的query状态数据count没有达到init状态的数量count阀值(即query_count<init_count*query_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count等

监控指标二:如果指标一告警没有被触发,该批次在满足query状态数据count达到init状态数量count的阀值(即query_count>=init_countquery_threshold)以后,在指定的延迟时间内(callback_timeout),该批次的callback状态数据count没有达到query状态的数量count阀值(即callback_count<query_countcallback_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count、callback_count等

群消息实时指标监控流程图如下:
群消息实时指标监控流程图

实例demo

RisingWave部署可以参考:RisingWave分布式SQL流处理数据库调研

假设:
query_threshold=1, callback_count=1
query_timeout= ‘5 minute’, callback_timeout= ‘1 minute’
0->init,1->query,2->callback

RisingWave SQL:

 
DROP TABLE t_msg;
CREATE TABLE t_msg(msg_id int,status smallint ,public_time timestamp,process_time timestamp as proctime()
) APPEND ONLY;set timezone = 'PRC';--PRC(People’s Republic of China)
show timezone;select * from t_msg;--统计不同状态的count
DROP MATERIALIZED VIEW mv_t_msg_groupby; 
CREATE MATERIALIZED VIEW mv_t_msg_groupby AS
SELECT public_time
,sum(case when status = 0  then 1 else 0 end) AS init_count
,sum(case when status = 1  then 1 else 0 end) AS query_count
,sum(case when status = 2  then 1 else 0 end) AS callback_count
,max(process_time) as process_time
FROM t_msg 
group by public_time;select * from mv_t_msg_groupby;--sink_query_alarm 
DROP SINK sink_query_alarm;
CREATE SINK sink_query_alarm AS 
SELECT public_time
,init_count
,query_count
,process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   init_count*1 > query_count --query_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--由于RisingWave不支持在【MATERIALIZED VIEW】和【SINK】等【可伸缩流】中指定处理时间字段,因此需要借助外部存储kafka周转
--RisngWave官方给的解释:support a proctime on an append only stream might be easier but on retractable stream could take extra cost. We must think it carefully to introduce such a feature.--sink_query_succ 
DROP SINK sink_query_succ;
CREATE SINK sink_query_succ AS 
SELECT public_time
,init_count
,query_count
,callback_count
,process_time as query_succ_process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' >= now()   --query_timeout=1 minute, 在指定的时间内,【Delete and clean expired data】
and   init_count*1 <= query_count --query_threshold=1,query_count达到了指定值
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_succ'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--source连接器
DROP SOURCE source_query_succ;
CREATE SOURCE IF NOT EXISTS source_query_succ (init_count int,query_count int ,callback_count int ,public_time timestamp,query_succ_process_time timestamp
)
WITH (connector='kafka',topic='t_sink_query_succ',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='earliest', -- earliest ,latest,default:earliest 
) FORMAT PLAIN ENCODE JSON;select * from source_query_succ;--sink_callback_alarm,用到动态过滤器和时间过滤器
DROP SINK sink_callback_alarm;
CREATE SINK sink_callback_alarm AS 
WITH tmp AS ( 
select public_time, min(query_succ_process_time) as query_succ_process_time  -- 动态过滤器
FROM source_query_succ 
group by public_time
)
SELECT b.public_time
,b.init_count
,b.query_count
,b.callback_count
,b.process_time
,a.query_succ_process_time
FROM  tmp a
JOIN  mv_t_msg_groupby b ON a.public_time=b.public_time
where a.query_succ_process_time + INTERVAL '1 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   b.query_count*1 > b.callback_count --callback_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_callback_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);-- 模拟数据
--init
INSERT INTO t_msg values(1,0,'2024-02-23 15:55:00'::TIMESTAMP); --比当前系统时间早
INSERT INTO t_msg values(2,0,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,0,'2024-02-23 15:55:00'::TIMESTAMP);
--query                                
INSERT INTO t_msg values(1,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,1,'2024-02-23 15:55:00'::TIMESTAMP);
--callback                             
INSERT INTO t_msg values(1,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,2,'2024-02-23 15:55:00'::TIMESTAMP);

查看监控结果:

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_query_alarm -C 
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_callback_alarm -C 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2804865.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

【Docker】免费使用的腾讯云容器镜像服务

需要云服务器等云产品来学习Linux可以移步/-->腾讯云<--/官网&#xff0c;轻量型云服务器低至112元/年&#xff0c;新用户首次下单享超低折扣。 目录 1、设置密码 2、登录实例&#xff08;sudo docker login xxxxxx&#xff09; 3、新建命名空间&#xff08;每个命名空…

C# 1.消息队列MQ使用场景--图文解析

为什么使用消息队列MQ&#xff08;Message Queue&#xff09;&#xff1f; 消息队列有什么优点和缺点&#xff1f; Kafka(大数据日志采集)、ActiveMQ(最早的MQ--目前使用较少)、RabbitMQ(开源&#xff0c;中小型企业使用足够)、RocketMQ(阿里开发&#xff0c;大型企业适用) 都…

【Linux网络】网络编程套接字(TCP)

目录 地址转换函数 字符串IP转整数IP 整数IP转字符串IP 关于inet_ntoa 简单的单执行流TCP网络程序 TCP socket API 详解及封装TCP socket 服务端创建套接字 服务端绑定 服务端监听 服务端获取连接 服务端处理请求 客户端创建套接字 客户端连接服务器 客户端…

Translumo:基于.NET开发的开源的屏幕实时翻译工具

推荐一个高级实时屏幕翻译器&#xff0c;可用于游戏、视频实时翻译。 01 项目简介 Translumo是基于.Net开发的、开源屏幕翻译器软件&#xff0c;它可以实时检测并翻译屏幕上所选区域中出现的文本&#xff0c;如视频的字幕和图片中的文字等。 项目架构如下&#xff1a; 02 项…

QT问题 打开Qt Creator发现没有菜单栏

之前不知道按了什么快捷键,当我再次打开Qt Creator时发现菜单栏消失啦 找了许多原因发现:安装有道词典的快捷键Ctrl Alt m 与Qt Creator里的快捷键冲突导致菜单栏被莫名其妙的隐藏 解决方法: 1找到有道词典快捷键 2再次按快捷键 Ctrl Alt m就可以重新显示菜单栏

JavaScript原型继承与面向对象编程思想

原型继承与面向对象编程思想 在JavaScript中&#xff0c;原型(prototype)、构造函数(constructor)和实例对象(instance)是面向对象编程中的重要概念&#xff0c;并且它们之间存在着紧密的关系。 原型(prototype)&#xff1a;原型是JavaScript中对象之间关联的一种机制。每个Ja…

1.0 vue环境安装

1、安装node.js 1.1 下载最新版本Node.js (nodejs.org)Node.js 1.2 开始安装 普通的安装过程&#xff0c;也记录下吧 安装完成&#xff01; 1.3 检查nodejs是否安装成功 代开cmd命令窗口输入 node -v&#xff0c;如果看到了刚才下载的版本号&#xff0c;则表示已经安装成功…

Go 中如何高效遍历目录?探索几种方法

嗨&#xff0c;大家好&#xff01;我是波罗学。本文是系列文章 Go 技巧第十八篇&#xff0c;系列文章查看&#xff1a;Go 语言技巧。 目录遍历是一个很常见的操作&#xff0c;它的使用场景有如文件目录查看&#xff08;最典型的应用如 ls 命令&#xff09;、文件系统清理、日志…

基于yolov5的苹果检测(pytorch框架)【python源码+UI界面+功能源码详解】

功能演示&#xff1a; 基于yolov5的苹果检测系统&#xff0c;系统既能够实现图像检测&#xff0c;也可以进行视屏和摄像实时检测_哔哩哔哩_bilibili &#xff08;一&#xff09;简介 基于yolov5的苹果检测系统是在pytorch框架下实现的&#xff0c;这是一个完整的项目&#xf…

axure9.0 工具使用思考

原型设计软件【AxureRP】快速原型设计工具原型设计软件【AxureRP】快速原型设计工具原型设计软件【AxureRP】快速原型设计工具原型设计软件【AxureRP】快速原型设计工具原型设计软件【AxureRP】快速原型设计工具原型设计软件【AxureRP】快速原型设计工具原型设计软件【AxureRP】…

Vue学习之计算属性

模板中的表达式虽然方便&#xff0c;但也只能用来做简单的操作。如果在模板中写太多逻辑&#xff0c;会让模板变得臃肿&#xff0c;难以维护。比如说&#xff0c;我们有这样一个包含嵌套数组的对象&#xff1a; const author reactive({name: John Doe,books: [Vue 2 - Advan…

深入了解负载均衡器

每个负载均衡器都是反向代理&#xff0c;但并非每个反向代理都必须是负载均衡器。 0*GFvXmdPz97bwF8vU.jpeg 问题&#xff1a; OSI模型是什么样的&#xff1f; 1*JzMQUxqHiATuQlIgyIv-xQ.png 问题&#xff1a; 负载均衡器的需求是什么&#xff1f; 答案 → 为了创建一个容错系统…

SpringCloud(16)之SpringCloud OpenFeign和Ribbon

一、Spring Cloud OpenFeign介绍 Feign [feɪn] 译文 伪装。Feign是一个轻量级的Http封装工具对象,大大简化了Http请求,它的使用方法 是定义一个接口&#xff0c;然后在上面添加注解。不需要拼接URL、参数等操作。项目主页&#xff1a;GitHub - OpenFeign/feign: Feign makes w…

Promethues的Agent 模式代理转发的实施教程

目录 一、为什么需要代理转发&#xff1f; 二、Prometheus Agent模式的实施步骤 1、升级Prometheus的版本 2、配置B服务器的配置文件 3、启动代理点B服务器的Prometheus 4、接收端C服务器的Prometheus的安装同步骤1 5、启动接收端C服务器的Prometheus 6、验证是否能够正…

CVE-2023-44313 Apache ServiceComb Service-Center SSRF 漏洞研究

本次项目基于go语言&#xff08;本人不精通&#xff09;&#xff0c;虽不是java web框架了 &#xff0c;但搭建web服务的框架一些思想理念却是通用的&#xff0c;我们由此可以得到一些蛛丝马迹....... 目录 漏洞简介 漏洞分析 漏洞复现 漏洞简介 Apache ServiceComb Servi…

【MySQL系列 04】深入浅出索引

一、索引介绍 提到数据库索引&#xff0c;相信大家都不陌生&#xff0c;在日常工作中会经常接触到。比如某一个 SQL 查询比较慢&#xff0c;分析完原因之后&#xff0c;你可能就会说“给某个字段加个索引吧”之类的解决方案。 但到底什么是索引&#xff0c;索引又是如何工作的…

Stable Diffusion 模型分享:A-Zovya RPG Artist Tools(RPG 大师工具箱)

本文收录于《AI绘画从入门到精通》专栏&#xff0c;专栏总目录&#xff1a;点这里。 文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八 下载地址 模型介绍 A-Zovya RPG Artist Tools 模型是一个针对 RPG 训练的一个模型&#xff0c;可以生成一些 R…

sql注入 [极客大挑战 2019]FinalSQL1

打开题目 点击1到5号的结果 1号 2号 3号 4号 5号 这里直接令传入的id6 传入id1^1^1 逻辑符号|会被检测到&#xff0c;而&感觉成了注释符&#xff0c;&之后的内容都被替换掉了。 传入id1|1 直接盲注比较慢&#xff0c;还需要利用二分法来编写脚本 这里利用到大佬的脚…

如何使用ChatGPT创建一份优质简历

目录 第一步&#xff1a;明确目标和重点 第二步&#xff1a;与ChatGPT建立对话 第三步&#xff1a;整理生成的内容 第四步&#xff1a;注重行文风格 第五步&#xff1a;强调成就和量化结果 第六步&#xff1a;个性化和定制 第七步&#xff1a;反复修改和完善 总结 在现…

国家建筑装配式内装产业基地在沪成立,副主任单位优积科技协同助推绿色低碳循环发展

上海市室内装饰行业协会装配式内装产业专业委员会成立大会暨“国家建筑装配式内装产业基地”项目启动会于3月21日下午1点在上海光大酒店隆重举行。出席此次活动的包括市装协会长徐国俭&#xff0c;市装协党支部书记兼秘书长丛国梁&#xff0c;市装协装配式内装委主任顾泰昌&…