Flink实时电商数仓之DWS层

需求分析

  • 关键词
    在这里插入图片描述
  • 统计关键词出现的频率

IK分词

进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。

<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.17</artifactId>
</dependency><dependency><groupId>com.janeluo</groupId><artifactId>ikanalyzer</artifactId>
</dependency>

测试代码如下:

public class IkUtil {public static void main(String[] args) throws IOException {String s = "Apple 苹果15 5G手机";StringReader stringReader = new StringReader(s);IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分Lexeme next = ikSegmenter.next();while (next!= null) {System.out.println(next.getLexemeText());next = ikSegmenter.next();}}
}

整体流程

  1. 创建自定义分词工具类IKUtil,IK是一个分词工具依赖
  2. 创建自定义函数类
  3. 注册函数
  4. 消费kafka DWD页面主题数据并设置水位线
  5. 从主流中过滤搜索行为
    • page[‘item’] is not null
    • item_type : “keyword”
    • last_page_id: “search”
  6. 使用分词函数对keyword进行拆分
  7. 对keyword进行分组开窗聚合
  8. 写出到doris
    • 创建doris sink
    • flink需要打开检查点才能将数据写出到doris

在这里插入图片描述

具体实现

import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import com.atguigu.gmall.realtime.dws.function.KwSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;/*** title:** @Author 浪拍岸* @Create 28/12/2023 上午11:06* @Version 1.0*/
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp {public static void main(String[] args) {new DwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");}@Overridepublic void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//1. 读取主流dwd页面主题数据tableEnv.executeSql("create table page_info(\n" +"    `common` map<string,string>,\n" +"    `page` map<string,string>,\n" +"    `ts` bigint,\n" +"    `row_time` as to_timestamp_ltz(ts,3),\n" +"     WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));//测试是否获取到数据//tableEnv.executeSql("select * from page_info").print();//2. 筛选出关键字keywordsTable keywrodTable = tableEnv.sqlQuery("select\n" +"    page['item'] keywords,\n" +"    `row_time`,\n" +"    ts\n" +" from page_info\n" +" where page['last_page_id'] = 'search'\n" +" and page['item_type'] = 'keyword'\n" +" and page['item'] is not null");tableEnv.createTemporaryView("keywords_table", keywrodTable);// 测试是否获取到数据//tableEnv.executeSql("select * from keywords_table").print();//3. 自定义分词函数并注册tableEnv.createTemporarySystemFunction("kwSplit", KwSplit.class );//4. 调用分词函数对keywords进行拆分Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`" +" from keywords_table" +" left join lateral Table(kwSplit(keywords)) on true");tableEnv.createTemporaryView("split_kw_table", splitKwTable);//tableEnv.executeSql("select * from split_kw_table").print();//5. 对keyword进行分组开窗聚合Table windowAggTable = tableEnv.sqlQuery("select\n" +"    keyword,\n" +"    cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n" +"    cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n" +"    cast(current_date as string)  cur_date,\n" +"    count(*) keyword_count\n" +"from split_kw_table\n" +"group by tumble(row_time, interval '10' second), keyword");//tableEnv.createTemporaryView("result_table",table);//tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();//6. 写出到doristableEnv.executeSql("create table doris_sink\n" +"(\n" +"    keyword                STRING,\n" +"    wStart                 STRING,\n" +"    wEnd                   STRING,\n" +"    cur_date               STRING,\n" +"    keyword_count          BIGINT\n" +")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));windowAggTable.insertInto("doris_sink").execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2660632.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Kubernetes 学习总结(41)—— 云原生容器网络详解

背景 随着网络技术的发展&#xff0c;网络的虚拟化程度越来越高&#xff0c;特别是云原生网络&#xff0c;叠加了物理网络、虚机网络和容器网络&#xff0c;数据包在网络 OSI 七层网络模型、TCP/IP 五层网络模型的不同网络层进行封包、转发和解包。网络数据包跨主机网络、容器…

开箱即用的企业级数据和业务管理中后台前端框架Ant Design Pro 5的开箱使用和偏好配置

Ant Design Pro 介绍 Ant Design Pro 是一个开箱即用的企业级前端解决方案&#xff0c;基于 Ant Design 设计体系&#xff0c;提供了丰富的组件和功能&#xff0c;帮助开发者更快速地开发和部署企业级应用。 Ant Design Pro 使用 React、umi 和 dva 这三个主要的前端开发技术…

elementui+vue2 input输入框限制只能输入数字

方法1 自定义表单校验 <el-form :model"Formdata" ref"formRef" :rules"nodeFormRules" label-width"100px"><el-form-itemlabel"年龄"prop"age"><el-input v-model.number"Formdata.age&q…

HackTheBox-Machines--Photobomb

文章目录 1 端口扫描2 测试思路3 Web漏洞探测4 权限提升 Photobomb 测试过程 1 端口扫描 nmap -sC -sV 10.129.57.2102 测试思路 目标开启了22、80端口&#xff0c;所以测试点还是从80端口开始。 针对80端口的测试&#xff1a;   1.目录扫描   2.网页源代码   3.web漏洞 …

Java开发框架和中间件面试题(10)

目录 104.怎么保证缓存和数据库数据的一致性&#xff1f; 105.什么是缓存穿透&#xff0c;什么是缓存雪崩&#xff1f;怎么解决&#xff1f; 106.如何对数据库进行优化&#xff1f; 107.使用索引时有哪些原则&#xff1f; 108.存储过程如何进行优化&#xff1f; 109.说说…

白话机器学习的数学-1-回归

1、设置问题 投入的广告费越多&#xff0c;广告的点击量就越高&#xff0c;进而带来访问数的增加。 2、定义模型 定义一个函数&#xff1a;一次函数 y ax b &#xff08;a 是斜率、b 是截距&#xff09; 定义函数&#xff1a; 3、最小二乘法 例子&#xff1a; 用随便确定的参…

node 项目中 __dirname / __filename 是什么,为什么有时候不能用?

__dirname 是 Node.js 中的一个特殊变量&#xff0c;表示当前执行脚本所在的目录的绝对路径。 __filename 同理&#xff0c;是 Node.js 中的一个特殊变量&#xff0c;表示当前执行脚本的绝对路径&#xff0c;包括文件名。 在 Node.js 中&#xff0c;__dirname / __filename是…

用通俗易懂的方式讲解大模型:Prompt 提示词在开发中的使用

OpenAI 的 ChatGPT 是一种领先的人工智能模型&#xff0c;它以其出色的语言理解和生成能力&#xff0c;为我们提供了一种全新的与机器交流的方式。但不是每个问题都可以得到令人满意的答案&#xff0c;如果想得到你所要的回答就要构建好你的提示词 Prompt。本文将探讨 Prompt 提…

鸿鹄电子招投标系统:基于Spring Boot、Mybatis、Redis和Layui的企业电子招采平台源码与立项流程

在数字化时代&#xff0c;企业需要借助先进的数字化技术来提高工程管理效率和质量。招投标管理系统作为企业内部业务项目管理的重要应用平台&#xff0c;涵盖了门户管理、立项管理、采购项目管理、采购公告管理、考核管理、报表管理、评审管理、企业管理、采购管理和系统管理等…

elasticsearch系列九:异地容灾-CCR跨集群复制

概述 起初只在部分业务中采用es存储数据&#xff0c;在主中心搭建了个集群&#xff0c;随着es在我们系统中的地位越来越重要&#xff0c;数据也越来越多&#xff0c;针对它的安全性问题也越发重要&#xff0c;那如何对es做异地容灾呢&#xff1f; 今天咱们就一起看下官方提供的…

分布式事务之最终一致性

分布式事务之最终一致性 参考链接分布式事务基础理论概述案例解决方案:RocketMQ可靠消息注意事项&#xff1a;代码实现 参考链接 原文链接&#xff1a;https://blog.csdn.net/jikeyeka/article/details/126296938 分布式事务基础理论 基于上述的CAP和BASE理论,一般情况下会保…

西北大学844计算机类考研-25级初试高分总攻略

西北大学844计算机类考研-25级初试高分攻略 个人介绍 ​ 本人是西北大学22级软件工程研究生&#xff0c;考研专业课129分&#xff0c;过去一年里在各大辅导机构任职&#xff0c;辅导考研学生专业课844&#xff0c;辅导总时长达400小时&#xff0c;辅导学生超过20余人&#xf…

展现无限创意的Photoshop 2023 Mac/win中文版:打造您的独特艺术之旅

无论您是摄影师、设计师还是艺术家&#xff0c;Photoshop 2023&#xff08;ps 2023&#xff09;都是您不可或缺的创意工具。最新升级的Photoshop 2023带来了更多令人兴奋的功能和改进&#xff0c;让您能够以前所未有的方式展现无限创意。 首先&#xff0c;Photoshop 2023拥有强…

uni-app引入vant表单(附源码)

新建项目 下载安装vant npm i vant main.js引入 import { Form } from vant; import { Field } from vant;Vue.use(Form); Vue.use(Field);代码引入 <van-form submit"onSubmit"><van-fieldclass"rePwd"v-model"username"name"请…

SpringBoot 接口对数据枚举类型的入参以及出参转换处理

目录 1、在项目中使用枚举类型2、不做任何处理的演示效果2.1、接口出参2.2、接口入参 3、用枚举的code作为参数和返回值3.1 代码案例3.1.1、定义枚举基础接口BaseEnum&#xff0c;每个枚举都实现该接口3.1.2、性别Sex枚举并实现接口BaseEnum3.1.3、定义BaseEnum枚举接口序列化3…

Python+OpenCV 零基础学习笔记(4-5):计算机图形基础+Python相对文件路径+OpenCV图像+OpenCV视频

文章目录 相关链接运行环境前言计算机图形OpenCV简单使用图形读取文件读取可能会出现的问题&#xff1a;路径不对解决方案其它路径问题解决方案 图像显示保存OpenCV视频视频素材如何获取&#xff1f;简单视频读取 相关链接 【2022B站最好的OpenCV课程推荐】OpenCV从入门到实战 …

Spring高手之路-Spring事务的传播机制(行为、特性)

目录 含义 七种事务传播机制 1.REQUIRED&#xff08;默认&#xff09; 2.REQUIRES_NEW 3.SUPPORTS 4.NOT_SUPPORTED 5.MANDATORY 6.NEVER 7.NESTED 含义 事务的传播特性指的是当一个事务方法被另一个事务方法调用时&#xff0c;这个事务方法应该如何进行&#xff1f; 七…

HTTP限流控制:Go语言中的精细把关

开场白&#xff1a;在Web应用中&#xff0c;流量控制是一个关键的防护措施&#xff0c;用于防止资源过度消耗和潜在的安全威胁。特别是在面对DDoS攻击或异常请求时&#xff0c;限流显得尤为重要。今天&#xff0c;我们将探讨如何在Go语言中实现HTTP的限流控制。 知识点一&…

elasticsearch-hadoop.jar 6.8版本编译异常

## 背景 重新编译 elasticsearch-hadoop 包&#xff1b; GitHub - elastic/elasticsearch-hadoop at 6.8 编译 7.17 版本时很正常&#xff0c;注意设置下环境变量就好&#xff0c;JAVA8_HOME/.... 编译 6.8 版本时&#xff08;要求jdk8 / jdk9&#xff09;&#xff0c;出现…

使用 Django 的异步特性提升 I/O 类操作的性能

目录 一、引言 二、Django 的异步特性 三、提升 I/O 类操作的性能 四、示例代码 五、总结 一、引言 Django 是一个高级的 Python Web 框架&#xff0c;它以快速开发和简洁的代码而闻名。然而&#xff0c;对于一些 I/O 密集型的应用程序&#xff0c;Django 的同步特性可能…