Spark+实例解读

第一部分 Spark入门

学习教程:Spark 教程 | Spark 教程

Spark 集成了许多大数据工具,例如 Spark 可以处理任何 Hadoop 数据源,也能在 Hadoop 集群上执行。大数据业内有个共识认为,Spark 只是Hadoop MapReduce 的扩展(事实并非如此),如Hadoop MapReduce 中没有的迭代查询和流处理。然而Spark并不需要依赖于 Hadoop,它有自己的集群管理系统。更重要的是,同样数据量,同样集群配置,Spark 的数据处理速度要比 Hadoop MapReduce 快10倍左右。

Spark 的一个关键的特性是数据可以在内存中迭代计算,提高数据处理的速度。虽然Spark是用 Scala开发的,但是它对 Java、Scala、Python 和 R 等高级编程语言提供了开发接口。

第二部分 SparkCore

2 RDD

2.1 转换算子-map

map是将RDD的数据一条条处理,返回新的RDD

# 定义方法
def add(data):return data*10
print(rdd.map(add).collect)
# 定义lamabda表达式
rdd.map(lambda data:data*10)
2.2 转换算子-flatMap

flatMap对RDD执行map操作,然后执行解除嵌套操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)}.reduceByKey(_ + _).map { case ((feature, label), num) =>(feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x =>val size_entro = x.map(_._2).sumval res = x.map(_._2.toDouble / size_entro).map { t =>-t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x => x / size }.map(_._2).sum

2.3转换算子-reduceByKey

针对KV型RRDD自动按照key进行分组,然后按照提供的聚合逻辑,对组内数据value完成聚合操作

rdd.reduceByKey(func)

      val clickStat = joinDf.where(F.col("active_type")==="click").rdd.map(row => {val mapInfo = Option(row.getMap[String,Double](row.fieldIndex(feat)))mapInfo match {case Some(x) => xcase _ => null}}).filter(_!=null).flatMap(x=>x).reduceByKey(_+_)
2.4 转换算子-mapValues

针对二元元组RDD,对其内部的二元元组的value进行map操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)}.reduceByKey(_ + _).map { case ((feature, label), num) =>(feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x =>val size_entro = x.map(_._2).sumval res = x.map(_._2.toDouble / size_entro).map { t =>-t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x => x / size }.map(_._2).sum
2.5 转换算子-groupBy

将RDD的数据进行分组

rdd.groupBy(func)

rdd = sc.parallelize([('a',1),('a',11),('b',1)])
# 通过这个函数确认按照谁来分组(返回谁即可)
print(rdd.groupBy(lambda x:x[0]).collect())
print(rdd.groupBy(lambda x:x[0]).collect())
# 结果为:
​
    val userContentListHis = spark.thriftSequenceFile(inpath_his, classOf[LongVideoUserContentStat]).map(l=>{(l.getUid,l.getContent_properties.get(0).getId)}).toDF("uid", "docid").groupBy($"uid")
2.6 转换算子-filter

过滤想要的数据进行保存

rdd = sc.parallelize([1,2,3,4,5,6])
rdd.filter(lamdba x:x%2 == 1) # 只保留奇数
    val treatmentUser = spark.read.option("header", false).option("sep", "\t").csv(inpath).select("_c0").withColumnRenamed("_c0", "userid").withColumn("flow", getexpId($"userid")).filter($"flow" >= start and $"flow" <= end).select("userid").dropDuplicates()
2.7 转换算子-其他算子
distinct算子
rdd.distinct() 一般不写去重分区val userContentHis = hisPathList.map(path =>{val hisData = spark.thriftSequenceFile(path, classOf[LongVideoUserContentStat])println(s"hisData ==>${hisData.count()}")hisData}).reduce(_ union _).distinct().repartition(partition)
union算子 
2个rdd合并成一个rdd:rdd.union(other_rdd)
只合并不去重  rdd的类型不同也是可以合并的
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,2,3,4])
rdd3 = rdd1.union(rdd2)
2.8 算子面试题
1.groupByKey和reduceByKey的区别:
groupByKey仅仅有分组功能而已,reduceByKey除了分组还有聚合作用,是一个分组+聚合一体化的算子. 分组前先聚合再shuffle,预聚合,被shuffle的数据极大的减少,提升了性能.数据量越大,reduceByKey的性能优势也就越大.
​
2.rdd的分区数怎么查看? 
通过getNumPartitions API查看,返回int
​
3.Transformation和Action的区别:
转换算子的返回值100%是rdd,而Action算子不一定.转换算子是懒加载的,只有遇到Action才会执行
​
4.哪两个算子不经过Driver直接输出?
foreach 和 saveAsTextFile

3 RDD的持久化

3.1 RDD的持久化

rdd是过程数据 rdd进行相互迭代计算,执行开启时,新的RDD生成,老的RDD消失

3.2 RDD的缓存

val rawLog = profilePushLogReader(spark, date, span).persist()
3.3 RDD的checkPoint

也是将RDD的数据保存起来,仅支持磁盘存储,被认为是安全的, 不保留血缘关系

3.4 缓存面试题

4 案例

4.1 搜素引擎日志分析案例
4.2
4.3 ....
4.4 计算资源面试题
1.如何尽量提升任务计算的资源?
计算cpu核心和内存量,通过--executor-memory指定executor内存,通过--executor-cores指定executor的核心数
​

5 广播变量 累加器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3268663.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

22 Python常用内置函数——枚举

enumerate() 函数用来枚举可迭代对象中的元素&#xff0c;返回可迭代的 enumerate 对象&#xff0c;其中每个元素都是包含索引和值的元组。 print(enumerate(abcd)) print(list(enumerate(abcd))) # 枚举字符串中的元素 print(list(enumerate([hello, world]))) # 枚举列表中…

【数据结构】:大厂面试经典链表OJ题目详解

反转链表 206. 反转链表 - 力扣&#xff08;LeetCode&#xff09; 思路解透 本题就是通过不停地将最先的 head 节点位置的后一位插到最前面&#xff0c;完成链表的反转 本题需要两个节点变量 cur&#xff1a;其任务就是定位到原 head 节点位置的前一位&#xff0c;然后将自己…

百日筑基第二十八天-23种设计模式-行为型总汇

百日筑基第二十八天-23种设计模式-行为型总汇 文章目录 百日筑基第二十八天-23种设计模式-行为型总汇前言模板方法模式简介模板方式的特点模板方法模式结构类图模板方式模式案例分析模板方法模式应用源码分析模板方法模式的注意事项和细节 迭代器模式迭代器模式结构类图迭代器模…

git配置环境变量

一.找到git安装目录 打开此git安装目录下的bin文件&#xff0c;复制此文件路径 二.配置环境变量 2.1 右键点击此电脑的属性栏 2.2 点击高级系统配置 2.3 点击环境变量 2.4 按图中步骤进行配置 三.配置完成 win r 输入cmd打开终端 终端页面中输入 git --version 如图所示…

给定日期计算时间(2025新年倒计时)

目录 1.安装所需安装包 2.查看安装包是否安装成功 ​ 3.使用 Pandas 读取数据文件 4.定义图像背景 5.matplotlib输出 6.当前指定格式时间 7.2025新年倒计时 1.安装所需安装包 pip install 包名 2.查看安装包是否安装成功 python -m pip list ​ 3.使用 Pandas 读取数…

深度解析Linux-C——结构体(初始化,结构体数组,结构体大小,位段操作,联合体,内存对齐,C的预处理,宏和带参宏,条件编译)

目录 结构体的三种初始化 结构体的两种引用 结构体数组 结构体大小 结构体实现位段操作 联合体 内存对齐 C的预处理 带参宏 条件编译 结构体的三种初始化 定义如下结构体 struct student {char name[100]; int age; float height; } ; 1、定义变量时初始化 s…

Redis从入门到超神-(十二)Redis监听Key的过期事件

前言 试想一个业务场景&#xff0c;订单超过30分钟未支付需要做自动关单处理,修改订单状态&#xff0c;库存回退等&#xff0c;你怎么实现&#xff1f;方案一&#xff1a;可以使用定时任务扫表&#xff0c;通过支付状态和下单时间来判断是否支付过期。但是这样的方案是非常消耗…

推荐一款.NET开源、简洁易用的Windows桌面小说阅读应用

前言 今天大姚给大家分享一款.NET开源、免费、简洁易用的Windows桌面小说阅读应用(是原生的 Windows 应用&#xff0c;为 Windows 11 系统设计)&#xff1a;CleanReader.Desktop。 该应用适合喜欢阅读网文或者是本地轻量阅读的用户。 系统要求 操作系统&#xff1a;Windows 11…

Llama + Dify,在你的电脑搭建一套AI工作流

theme: smartblue 点赞 关注 收藏 学会了 本文简介 最近字节在推Coze&#xff0c;你可以在这个平台制作知识库、制作工作流&#xff0c;生成一个具有特定领域知识的智能体。 那么&#xff0c;有没有可能在本地也部署一套这个东西呢&#xff1f;这样敏感数据就不会泄露了&…

Linux之基础IO(下)

目录 缓冲区的概念 深入理解文件系统 创建文件的整个过程 软链接 硬链接 上一节课我们学习了基础IO中的文件的读写操作&#xff0c;以及文件描述符的概念和重定向的基本原理&#xff0c;本期我们继续进行基础IO的学习。 缓冲区的概念 在讲缓冲区之前&#xff0c;大家先看…

Redis实战篇(黑马点评)笔记总结

一、配置前后端项目的初始环境 前端&#xff1a; 对前端项目在cmd中进行start nginx.exe&#xff0c;端口号为8080 后端&#xff1a; 配置mysql数据库的url 和 redis 的url 和 导入数据库数据 二、登录校验 基于Session的实现登录&#xff08;不推荐&#xff09; &#xf…

C++ - char*、const char*、char[]、string

const char* const char* 用来定义字符串常量。 char[ ] char型的字符数组是一种定长的数组&#xff0c;存储指定长度的字符序列&#xff0c;数组中的每个元素都是一个char类型的变量&#xff0c;如&#xff1a; char arr[] {h, a, l, l, o, \0}; char c arr[0]; // 访问…

使用 Windows 应用程序 SDK 构建下一代应用程序

微软面临的最大问题之一是如何让 Windows 再次成为吸引开发者的平台。无论用户使用什么设备和操作系统&#xff0c;都可以很容易地将 Web 前端放在支持桌面和移动用户的云原生应用程序上。 我们处在一个奇怪的境地&#xff0c;唯一能利用最新 PC 硬件的应用程序是 Office、Phot…

【中项第三版】系统集成项目管理工程师 | 第 11 章 规划过程组⑤ | 11.13 - 11.14

前言 第11章对应的内容选择题和案例分析都会进行考查&#xff0c;这一章节属于10大管理的内容&#xff0c;学习要以教材为准。本章上午题分值预计在15分。 目录 11.13 制定预算 11.13.1 主要输入 11.13.2 主要输出 11.14 规划质量管理 11.14.1 主要输入 11.14.2 主要工…

HTML前端面试题之<iframe>标签

面试题&#xff1a;iframe 标签的作用是什么?有哪些优缺点 ? 讲真&#xff0c;刷这道面试题之前我根本没有接触过iframe&#xff0c;网课没讲过&#xff0c;项目实战没用过&#xff0c;但却在面试题里出现了&#xff01;好吧&#xff0c;我只能说&#xff1a;前端路漫漫&…

数据挖掘-数据预处理

来自&#x1f96c;&#x1f436;程序员 Truraly | 田园 的博客&#xff0c;最新文章首发于&#xff1a;田园幻想乡 | 原文链接 | github &#xff08;欢迎关注&#xff09; 文章目录 3.3.1 数据的中心趋势平均数和加权平均数众数&#xff0c;中位数和均值描述数据的离散程度 &a…

快速搞定分布式RabbitMQ---RabbitMQ进阶与实战

本篇内容是本人精心整理&#xff1b;主要讲述RabbitMQ的核心特性&#xff1b;RabbitMQ的环境搭建与控制台的详解&#xff1b;RabbitMQ的核心API&#xff1b;RabbitMQ的高级特性;RabbitMQ集群的搭建&#xff1b;还会做RabbitMQ和Springboot的整合&#xff1b;内容会比较多&#…

火山引擎云搜索服务通过信通院向量数据库可信认证

7月16日&#xff0c;首届线下“可信数据库发展大会”在北京举办&#xff0c;会上中国信息通信研究院&#xff08;中国信通院&#xff09;公布了 2024 上半年“可信数据库”产品能力评测结果。火山引擎云搜索服务在基本功能、运维管理、安全性、兼容性、扩展性、高可用、工具生态…

【LeetCode 随笔】C++入门级,详细解答加注释,持续更新中。。。

文章目录 58.【简单】最后一个单词的长度&#x1f31f; &#x1f308;你好呀&#xff01;我是 山顶风景独好 &#x1f388;欢迎踏入我的博客世界&#xff0c;能与您在此邂逅&#xff0c;真是缘分使然&#xff01;&#x1f60a; &#x1f338;愿您在此停留的每一刻&#xff0c;都…

Html+Css网页开发之动态登录页面(默认Chrome)

>>效果展示图<< 一、需求分析与设计要求 实现了一个动态背景图案的效果&#xff0c;包括一个白色的容器&#xff0c;内部有一个标题、一个输入框、一个按钮和一些文本。 背景是一个渐变色的线性渐变&#xff0c;而在容器的周围&#xff0c;有一些随机的方形和圆形图…