弱结构化日志 Flink SQL 怎么写?SLS SPL 来帮忙

作者:潘伟龙(豁朗)

背景

日志服务 SLS 是云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入 SLS 进行存储、分析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数据分析平台,在实时数据分析、风控检测等场景应用广泛。阿里云 Flink 原生支持阿里云日志服务 SLS 的 Connector,用户可以在阿里云 Flink 平台将 SLS 作为源表或者结果表使用。

阿里云 Flink SLS Connector 对于结构化的日志非常直接,通过配置,SLS 的日志字段可以与 Flink SQL 的 Table 字段列一一映射;然后仍有大量的业务日志并非完全的结构化,例如会将所有日志内容写入一个字段中,需要正则提前、分隔符拆分等手段才可以提取出结构化的字段,基于这个场景,本文介绍一种使用 SLS SPL 配置 SLS Connector 完成数据结构化的方案,覆盖日志清洗与格式规整场景。

弱结构化日志处理的痛点

弱结构化日志现状与结构化处理需求的矛盾

日志数据往往是多种来源,多种格式,往往没有固定的 Schema,所以在数据处理前,需要先对数据进行清洗、格式规整,然后在进行数据分析;这类数据内容格式是不固定的,可能是 JSON 字符串、CSV 格式,甚至是不规则的 Java 堆栈日志。

Flink SQL 是一种兼容 SQL 语法的实时计算模型,可以基于 SQL 对结构化数据进行分析,但同时也要求源数据模式固定:字段名称、类型、数量是固定;这也是 SQL 计算模型的基础。

日志数据的弱结构化特点与 Flink SQL 结构化分析之间有着一道鸿沟,跨越这道鸿沟需要一个中间层来进行数据清洗、规整;这个中间层的方案有多种选择可以使用,下面会对不同的方案做简单对比,并提出一种新的基于 SLS SPL 的方案来轻量化完成解决数据清洗规整的工作。

弱结构化日志数据

下面是一条日志示例,日志格式较为复杂,既有 JSON 字符串,又有字符串与 JSON 混合的场景。其中:

  • Payload 为 JSON 字符串,其中 schedule 字段的内容也是一段 JSON 结构。
  • requestURL 为一段标准的 URL Path 路径。
  • error 字段是前半部分包含 CouldNotExecuteQuery:字符串,后半部分是一段 JSON 结构。
  • tag:path 包含日志文件的路径,其中 service_a 可能是业务名称。
  • caller 中包含文件名与文件行数。
{"Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}","TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887","TaskType": "ALERT","__source__": "11.199.97.112","__tag__:__hostname__": "iabcde12345.cloud.abc121","__tag__:__path__": "/var/log/service_a.LOG","caller": "executor/pool.go:64","error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}","requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s","ts": "2024-01-29 22:57:13"
}

结构化数据处理需求

对于这样的日志提取出更有价值的信息需要进行数据清洗,首先需要提取重要的字段,然后对这些字段进行数据分析;本篇关注重要字段的提取,分析仍然可以在 Flink 中进行。

假设提取字段具体需求如下:

  • 提取 error 中的 httpCode、errorCode、errorMessage、requestID。
  • 提取 tag:path 中的 service_a 作为 serviceName。
  • 提取 caller 中的 pool.go 作为 fileName,64 作为 fileNo。
  • 提取 Payload 中的 project;提取 Payload 下面的 schedule 中的 type 为 scheuleType。
  • 重命名 source 为 serviceIP。
  • 其余字段舍弃。

最终需要的字段列表如下,基于这样一个表格模型,我们可以便捷的使用 Flink SQL 进行数据分析。

图片

解决方案

实现这样的数据清洗,有很多种方法,这里列举几种基于 SLS 与 Flink 的方案,不同方案之间没有绝对的优劣,需要根据不同的场景选择不同的方案。

数据加工方案: 在 SLS 控制台创建目标 Logstore,通过创建数据加工任务,完成对数据的清洗。

图片

Flink 方案: 将 error 和 payload 指定为源表字段,通过 SQL 正则函数、JSON 函数对字段进行解析,解析后的字段写入临时表,然后对临时表进行分析。

图片

SPL 方案: 在 Flink SLS Connector 中配置 SPL 语句,对数据进行清洗,Flink 中源表字段定义为清洗后的数据结构。

图片

从上述三种方案的原理不难看出,在需要数据清洗的场景中,在 SLS Connector 中配置 SPL 是一种更轻量化的方案,具有轻量化、易维护、易扩展的特点。

在日志数据弱结构化的场景中,SPL 方案既避免了方案一中创建临时中间 Logstore,也避免了方案二中在 Flink 中创建临时表,在离数据源更近的位置进行数据清洗,在计算平台关注业务逻辑,职责分离更加清晰。

如何在 Flink 中使用 SPL

接下来以一段弱结构化日志为例,来介绍基于 SLS SPL 的能力来使用 Flink。为了便于演示,这里在 Flink 控制台配置 SLS 的源表,然后开启一个连续查询以观察效果。在实际使用过程中,仅需修改 SLS 源表配置,即可完成数据清洗与字段规整。

SLS 准备数据

  • 开通 SLS,在 SLS 创建 Project,Logstore,并创建具有消费 Logstore 的权限的账号 AK/SK。
  • 当前 Logstore 数据使用 SLS SDK 写入模拟数据,格式使用上述日志片段,其中包含 JSON、复杂字符串等弱结构化字段。

图片

预览 SPL 效果

在 Logstore 可以可以开启扫描模式,SLS SPL 管道式语法使用分隔符分割不同的指令,每次输入一个指令可以即时查看结果,然后增加管道数,渐进式、探索式获取最终结果。

图片

对上图中的 SPL 进行简单描述:

* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, '$.type') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project
  • 1 行:project 指令:从原始结果中保留 Payload、error、tag:path、caller 字段,舍弃其他字段,这些字段用于后续解析。
  • 2 行:parse-json 指令:将 Payload 字符串展开为 JSON,第一层字段出现在结果中,包括 lastNotified、serviceUri、jobID 等。
  • 3 行:project-away 指令:去除原始 Payload 字段。
  • 4 行:parse-regexp 指令:按照 error 字段中的内容,解析其中的部分 JSON 内容,置于 errorJson 字段。
  • 5 行:parse-json 指令:展开 errorJson 字段,得到 httpCode、errorCode、errorMessage 等字段。
  • 6 行:parse-regexp 指令:通过正则表达式解析出 tag:path 种的文件名,并命名为 serviceName。
  • 7 行:parse-regexp 指令:通过正则表达式捕获组解析出 caller 种的文件名与行数,并置于 fileName、fileNo 字段。
  • 8 行:project-rename 指令:将 tag:hostname 字段重命名为serviceHost。
  • 9 行:extend 指令:使用 json_extract_scalar 函数,提取 schedule 中的 type 字段,并命名为 scheduleType。
  • 10 行:project 指令:保留需要的字段列表,其中 project 字段来自于 Payload。

创建 SQL 作业

在阿里云 Flink 控制台创建一个空白的 SQL 的流作业草稿,点击下一步,进入作业编写。

图片

在作业草稿中输入如下创建临时表的语句:

CREATE TEMPORARY TABLE sls_input_complex (errorCode STRING,errorMessage STRING,fileName STRING,fileNo STRING,httpCode STRING,requestID STRING,scheduleType STRING,serviceHost STRING,project STRING,proctime as PROCTIME()
) WITH ('connector' = 'sls','endpoint' ='cn-beijing-intranet.log.aliyuncs.com','accessId' = '${ak}','accessKey' = '${sk}','starttime' = '2024-02-01 10:30:00','project' ='${project}','logstore' ='${logtore}','query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project');
  • 其中 a k , {ak}, ak,{sk}, p r o j e c t , {project}, project,{logstore} 需要替换为有消费权限的 AK 账号。
  • query 字段,替换为上述 SPL,注意在阿里云 Flink 控制台需要对单引号使用单引号转义,并且消除换行符。
  • SPL 最终得到的字段列表与 TABLE 中字段对应。

连续查询及效果

在作业中输入分析语句,查看结果数据:

SELECT * FROM sls_input_complex

点击右上角调试按钮,进行调试,可以看到 TABLE 中每一列的值,对应 SPL 处理后的结果。

图片

总结

为了适应弱结构化日志数据的需求,Flink SLS Connector 进行了升级,支持直接通过 Connector配置 SPL 的方式实现 SLS 数据源的清洗下推,特别是需要正则字段提取、JSON 字段提取、CSV 字段提取场景下,相较原数据加工方案和原 Flink SLS Connector 方案更轻量级,让数据清洗的职责更加清晰,在数据源端完成数据清洗工作,也可以减少数据的网络传输流量,使得到达 Flink 的数据已经是规整好的数据,可以更加专注在 Flink 中进行业务数据分析。

同时为了便于 SPL 验证测试,SLS 扫描查询也已支持使用 SPL 进行查询,可以实时看到 SPL 管道式语法执行结果。

参考链接:

[1] 日志服务概述

https://help.aliyun.com/zh/sls/product-overview/what-is-log-service

[2] SPL 概述

https://help.aliyun.com/zh/sls/user-guide/spl-overview

[3] 阿里云 Flink Connector SLShttps://help.aliyun.com/zh/flink/developer-reference/log-service-connector[4] SLS 扫描查询

https://help.aliyun.com/zh/sls/user-guide/scan-based-query-overview

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2813217.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

第十四届校模拟赛第一期(一)

“须知少时凌云志,自许人间第一流” 鄙人11月八号有幸参加学校校选拔赛,题型为5道填空题,5道编程题,总时间为4小时。奈何能力有限,只完成了5道填空和3道编程大题,现进行自省自纠,分享学习&#…

四 . 分支和循环——Java基础篇

四 . 分支和循环 1 . switch的基本语法 if 和 swicth 的对比: if既可以用于范围校验, 也可以用于等值校验swicth对于if效率更高,只能用于等值校验 语法格式: switch(表达式){case 常量值1:语句块1;//break;case 常量值2:语句块2;//break; // ...[default:语句块n1;break;] }…

面试redis篇-10Redis集群方案-主从复制

在Redis中提供的集群方案总共有三种: 主从复制哨兵模式分片集群主从复制 单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离。 主从数据同步原理 Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每…

Wireshark TS | Linux 系统对时问题

问题描述 节前业务运维同事提交了一个 case ,说是部署在新业务区域的 Linux 服务器和老业务区域的 Linux 服务器无法对时,脚本里使用的是 clockdiff 命令,无法正常返回结果,而在老业务区域两台服务器之间执行命令就正常&#xff…

【笔记】深度学习入门:基于Python的理论与实现(三)

误差反向传播法 一 个能够高效计算权重参数的梯度的方法 计算图 正向传播 太郎在超市买了 2 个 100 日元一个的苹果,消费税是 10%,请计 算支付金额。 反向传播(导数) 如果苹果的价格增加某个微小值, 则最终的支付金额…

【JavaSE】实用类——String、日期等

目录 String类常用方法String类的equals()方法String中equals()源码展示 “”和equals()有什么区别呢? StringBuffer类常用构造方法常用方法代码示例 面试题:String类、StringBuffer类和StringBuilder类的区别?日期类Date类Calendar类代码示例…

Redis学习------实战篇----2024/02/27

1.导入项目 2.基于Session实现登录 手机验证码完整实现 /*** 发送验证码* param phone* param session* return*/Overridepublic Result sendCode(String phone, HttpSession session) {//1.校验手机号if(RegexUtils.isPhoneInvalid(phone)){//2.如果不符合,返回错…

非常好!超齐全的故障诊断数据集及相关实验平台介绍

故障诊断数据集目录 一. 故障诊断数据集库介绍 二. 轴承故障诊断数据集 1.美国-凯斯西储大学轴承数据中心轴承数据集 2.SUDA试验台数据集 3.美国-机械故障预防技术学会MFPT 4.德国-帕德伯恩大学Paderborn轴承数据集 5.SDUST山东科技大学数据集 6.SEU东南大学轴承数据集 …

ARMv8-AArch64 的异常处理模型详解之异常处理详解(同步异常和异步异常的分析和处理)

这里写目录标题 一,同步异常的分析1.1 同步异常分析-异常链接寄存器ELR1.2 同步异常分析-异常综合寄存器ESR,Exception Syndrome Register1.3 同步异常分析-错误地址寄存器FAR,Fault Address Register 二, 同步异常的处理示例 Synchronous ex…

windows 11+docker desktop+grafana+influxDB+python写入

下载安装docker desktop 出现WSL相关的错误。WSL是一个linux内核的子系统,docker是基于linux内核的,所以运行docker需要WSL。 以管理员权限打开powershell,查看WSL状态 wsl --status 我遇到的错误是因为我关闭了windows的某些更新 执行上…

SAP Business Technology Platform (BTP)的架构理解

长期以来,我在与客户和伙伴的沟通交流中发现大家依然对SAP业务技术平台 - SAP Business Technology Platform (以下简称BTP)纯有各种疑惑,借此机会借助我原来作为SAP内部IT开发的经验和近期一年来在客户前线的经验,简要聊一下我对BTP的架构理…

工厂安全智能巡检机器人系统开发及其对您的价值体现

在工厂生产环境中,安全生产一直是业主、采购商以及中间商们十分关注的焦点。为了提升工厂生产线的安全性、效率和可追溯性,一款工厂安全智能巡检机器人系统应运而生,旨在为需求方带来全新的生产安全管理模式。 自动巡检增加效率 传统的工厂…

SD-WAN技术:优化国内外服务器访问的关键

在全球化的商业环境中,企业经常需要在国内访问国外的服务器。然而,由于地理位置和网络架构的限制,这种跨国访问往往会遇到速度慢、延迟高等问题。SD-WAN(软件定义广域网)技术的兴起,为企业提供了一种新的解…

大文件传输之udp如何传输大量数据

在数字化时代,对大文件传输的需求正以前所未有的速度增长。无论是个人用户还是企业,都急切寻求一种能够快速且稳定地处理大量数据的传输方法。UDP(用户数据报协议)以其无连接的特性和高效的数据传输能力,成为了大文件传…

C语言自定义数据类型(三)结构体指针

所谓结构体指针就是指向结构体变量的指针,一个结构体变量的起始地址就是这个结构体变量的指针。如果把一个结构体变量的起始地址存放在一个指针变量中,那么,这个指针变量就指向该结构体变量。 目录 一、指向结构体变量的指针 1.1举例说明 …

就业班 2401--2.26 Linux Day5--进程管理一

一、权限扩展 文件权限管理之: 隐藏权限防止root误删除 文件属性添加与查看 [rootlinux-server ~]# touch file1 file2 file3 1.查看文件属性 [rootlinux-server ~]# lsattr file1 file2 file3 ---------------- file1 ---------------- file2 ----------------…

【吴恩达·机器学习】第四章:详解神经网络:推理和训练

博主简介:努力学习的22级计算机科学与技术本科生一枚🌸博主主页: Yaoyao2024每日一言🌼: 勇敢的人,不是不落泪的人,而是愿意含着泪继续奔跑的人。 ——《朗读者》 0、声明 本系列博客文章是博主本人根据吴…

RubyMine 2023:让Ruby编程变得更简单 mac/win版

JetBrains RubyMine 2023是一款专为Ruby开发者打造的强大集成开发环境(IDE)。这款工具集成了许多先进的功能,旨在提高Ruby编程的效率和生产力。 RubyMine 2023软件获取 RubyMine 2023的智能代码编辑器提供了丰富的代码补全和提示功能&#…

深度学习基础(三)循环神经网络(RNN)

之前的章节我们初步介绍了卷积神经网络(CNN): 深度学习基础(二)卷积神经网络(CNN)-CSDN博客文章浏览阅读2次。卷积神经网络(CNN)的应用领域广泛,尤其在图像处…

Delphi 报错 Type androidx.collection.ArraySet is defined multiple times

Delphi 11 建立一个新的 Multi-Device Application 编译成app的时候报错 报错信息 [PAClient Error] Error: E7688 Unable to execute "E:\Program\Java\jdk1.8.0_301\bin\java.exe" -cp "e:\program\embarcadero\studio\22.0\bin\Android\r8-3.3.28.jar"…