大数据 - Spark系列《十一》- Spark累加器详解

 Spark系列文章:

大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客

大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客

大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客

大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客

大数据 - Spark系列《五》- Spark常用算子-CSDN博客

大数据 - Spark系列《六》- RDD详解-CSDN博客

大数据 - Spark系列《七》- 分区器详解-CSDN博客

大数据 - Spark系列《八》- 闭包引用-CSDN博客

大数据 - Spark系列《九》- 广播变量-CSDN博客

大数据 - Spark系列《十》- rdd缓存详解-CSDN博客

  1. 简介  

累加器用来把Executor端变量信息聚合到Driver端。在 Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回 Driver端进行merge。

观察一个问题: 原因是数据在executor端执行完毕以后并没有将acc结果数据返回

def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getScval rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)var count:Long = 0L//rdd.map(count+=_)rdd.foreach(num=>{count+=num})//计算的结果为0println(count)sc.stop()}

 

 解决方案:应该将每个executor执行的结果数据返回到Driver端进行聚合操作 , 返回最终结果数据

  2. LongAccumulator  

LongAccumulator 是 Spark 中的一种累加器(Accumulator)类型,用于在分布式计算中对长整型(Long)类型的数据进行累加。累加器是一种特殊的共享变量,它可以在各个节点上对其进行添加操作,并将结果汇总到驱动器程序中。

2.1 🥙主要特点:

  1. 分布式累加器: LongAccumulator 可以在不同节点上的任务中并行地对其进行添加操作,然后将结果汇总到驱动器程序中。

  2. 长整型数据类型: 适用于对长整型数据进行累加的场景,如计数器、求和等。

  3. 只支持累加操作: LongAccumulator 只支持累加操作,不能进行减法或其他运算。

  4. 原子性操作: 对累加器的操作是原子性的,可以保证在并发执行的情况下不会发生数据错误。

package com.doit.day0219import com.alibaba.fastjson.JSON
import com.doit.day0126.Movie
import org.apache.spark.{SparkConf, SparkContext}/*** @日期: 2024/2/20* @Author: Wang NaPao* @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343* @Tips: 和我一起学习吧* @Description: 使用 Spark 累加器统计解析 JSON 数据失败的次数*/object Test02 {def main(args: Array[String]): Unit = {// 创建SparkConf对象,并设置应用程序名称和运行模式val conf = new SparkConf().setAppName("Starting...") // 设置应用程序名称.setMaster("local[*]") // 设置运行模式为本地模式// 创建SparkContext对象,并传入SparkConf对象val sc = new SparkContext(conf)// 从文件加载 JSON 数据val rdd1 = sc.textFile("Data/movie.json")// 定义计数器变量//var cnt = 0// 使用 spark 内置的全局计数器val cnt1 = sc.longAccumulator("my long Accumulator") // 合并// 遍历 RDD 中的每一行数据rdd1.foreach(line => {try {// 尝试解析 JSON 数据val bean = JSON.parseObject(line, classOf[Movie])println(bean)} catch {case e: Exception => {// JSON 解析失败,增加计数器cnt1.add(1) // 计数1}}})// 打印累加器的值,即解析失败的次数println(cnt1.value) // 合并后的结果}
}

 

 

2.2 🥙注意事项:

  • 累加器数据属于全局变量 ,由行动算子触发执行 , 没有触发不执行累加 没算

  • 如果多次触发行动算子 , 累加器会执行多次

  • 建议将累加器的变化操作编写在行动算子中

2.3 🥙累加器方法

  •   add(value: T)  

向累加器中添加一个值,将这个值与累加器中已有的值进行累加。累加器的值类型必须与添加的值类型相符合。

package com.doit.day0219
import com.alibaba.fastjson.JSON
import com.doit.day0126.Movie
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/*** @日期: 2024/2/20* @Author: Wang NaPao* @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343* @Tips: 和我一起学习吧* @Description:*/object Test05 {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置应用程序名称和运行模式val conf = new SparkConf().setAppName("Starting...") // 设置应用程序名称.setMaster("local[*]") // 设置运行模式为本地模式// 创建 SparkContext 对象,传入 SparkConf 对象val sc = new SparkContext(conf)val sumAccumulate = sc.longAccumulator("sumAccumulate")val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))rdd.foreach(x=>sumAccumulate.add(x))println("累加器的值:"+sumAccumulate.value) //15sc.stop()}
}
  •  reset() 

重置累加器的值为初始值,通常是零或空。

// 重置累加器的值为初始值
accumulator.reset()
  •  value 

获取累加器的当前值。

// 获取累加器的当前值
val currentValue = accumulator.value
println("当前累加器的值:" + currentValue)

2.3 🧀实例1

累加器,主要用于正常业务作业过程中的一些附属信息统计

*       (比如程序遇到的脏数据条数,

*       程序处理的数据总行数,

*       程序处理的特定数据条数,

*       程序处理所花费的时长)

业务上需要对如下数据进行统计:比如统计每个city的用户数

"1,Mr.duan,18,beijing"
"2,Mr.zhao,28,beijing"
"b,Mr.liu,24,shanghai"
"4,Mr.nai,22,shanghai"
"a,Mr.liu,24,shanghai"
"6,Mr.ma"

 同时,还想在业务统计的过程中,附带统计出原始数据中的脏数据条数,并按多种不正确的格式进行分别统计,如:id字段无法数字化的条数字段数量不够的条数其他不正确的条数

package com.doit.day0219import com.alibaba.fastjson.JSON
import com.doit.day0126.Movie
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** @日期: 2024/2/20* @作者: Wang NaPao* @博客: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343* @Tips: 和我一起学习吧* @描述: 这个对象包含了一个 Spark 应用程序的入口点,用于处理从文件加载 JSON 数据的场景,统计每个城市的用户数量,并且附带统计原始数据中的脏数据条数。*/
object Test03 {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置应用程序名称和运行模式val conf = new SparkConf().setAppName("Starting...") // 设置应用程序名称.setMaster("local[*]") // 设置运行模式为本地模式// 创建 SparkContext 对象,传入 SparkConf 对象val sc = new SparkContext(conf)// 从文件加载 JSON 数据val rdd1 = sc.textFile("Data/city.txt")// 创建一个累加器用于统计脏数据条数val cnt1 = sc.longAccumulator("dirtyDataCount")// 对 RDD 进行处理:将每行数据拆分为数组,判断数组长度,若为4则返回 (城市, 1),否则更新累加器并返回 Noneval rdd2 = rdd1.map(line => {try {val arr1 = line.split(",")val no = arr1(0).toIntval name = arr1(1)val age = arr1(2).toIntval city = arr1(3)(city,1)} catch {case exception: Exception => {cnt1.add(1)("error", 1)}}}).filter(e=>{e._1!="error"}) // 过滤掉脏数据.reduceByKey(_ + _) // 对相同键的值进行累加.foreach(println) // 打印结果// 打印脏数据条数println("脏数据条数为:" + cnt1.value)}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2803729.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

大型语言模型的语义搜索(一):关键词搜索

关键词搜索(Keyword Search)是文本搜索种一种常用的技术,很多知名的应用app比如Spotify、YouTube 或 Google map等都会使用关键词搜索的算法来实现用户的搜索任务,关键词搜索是构建搜索系统最常用的方法,最常用的搜索算法是Okapi BM25&#x…

【文件搜索项目】使用jdbc操作SQLite

一. 插入(采用变量的方式插入) 1.创建数据源.DateSource 2.建立连接 3.构造SQL语句 4.执行SQL语句 5.释放资源 public class TestSQLite {public static void main(String[] args) throws SQLException {textInsert();}public static void textInsert(…

超级抽象的前端2

vue3的调用方法失败的原因 function validateConfirm(rule, value, callback) {if (value ! form.password) {callback(new Error(两次输入的密码不一致))} else {callback()}function showAgreement() {dialogVisible.value true}function submitForm() {// 这里是提交表单的…

【RT-Thread基础教程】线程优先级、Tick与线程状态

文章目录 前言一、线程优先级1.1 线程优先级是什么1.2 设置优先级范围 二、时间片2.1 Tick是什么2.2 时间片是什么2.3 时间片轮转 三、线程状态3.1 线程有哪些状态3.2 完整的状态转换图 总结 前言 在 RT-Thread 操作系统中,线程的优先级、Tick 以及线程状态是非常重…

每日一练 | 华为认证真题练习Day187

1、关于BGP状态机描述错误的 A. IDIE状态下,BGP拒绝任何进入的连接请求,是BGP初始状态 B. ACTIVE状态下,BGP将尝试进行TCP连接的建立,是BGP的中间状态 C. ESTABLISHED状态下,BGP对等体间可以交换UPDATE报文&#xf…

算法--动态规划(背包问题)

这里写目录标题 总览dp问题的优化01背包问题概述算法思想算法思想中的注意点例题代码等效为一维数组 完全背包问题概述算法思想例题代码等效为二维数组等效为一维数组 多重背包问题概述算法思想例题代码优化(采用二进制的方式)基本思想时间复杂度例题代码…

开发Chrome插件,background.js中log打印未出现在控制台

不同于内容脚本(通常命名content.js),在后台脚本(通常命名background.js或service-worker.js)中console.log并不会在控制台中直接显示。 要查看后台脚本上下文的正确控制台,执行如下步骤: 访问…

2024年2月17日~2月23日周报

文章目录 一、前言二、DDNet架构学习2.1 数据预处理2.2 网络模型构建 三、基于深度学习地震数据去噪处理3.1 深度学习在地震数据去噪中的研究方向3.2 深度学习地震数据去噪流程3.2.1 数据集准备3.2.2 模型构建3.2.3 训练网络 3.3 基于DnCNN的地震数据去噪实验 四、小结4.1 存在…

前端项目打包体积分析与优化

一、安装依赖分析工具 npm install webpack-bundle-analyz 二、修改webpack.config.js文件 1、导入上面下载的包 2、在plugins里创建实例 三、启动打包命令 npm run build 会弹出如下界面: 四、优化 1、通过CDN导入react-dom文件 修改webpack.config.js文件里…

【前端素材】推荐优质后台管理系统GramOs平台模板(附源码)

一、需求分析 后台管理系统是一种用于管理网站、应用程序或系统的工具,它通常作为一个独立的后台界面存在,供管理员或特定用户使用。下面详细分析后台管理系统的定义和功能: 1. 定义 后台管理系统是一个用于管理和控制网站、应用程序或系统…

Linux环境非root用户配置SSH免密登录,并解决登录仍提示输入密码

Linux环境非root用户配置SSH免密登录,并解决登录仍提示输入密码 ssh免密登录的简单理解 以A和B进行举例:A免密登录B (即在A服务器输入命令:ssh 非root用户名B的IP地址)可以直接免密码直接登录 A生成私钥和公钥&#…

QT基本组件

四、基本组件 Designer 设计师(重点) Qt包含了一个Designer程序,用于通过可视化界面设计开发界面,保存文件格式为.ui(界面文件)。界面文件内部使用xml语法的标签式语言。 在Qt Creator中创建文件时&#xf…

​分享87个Html企业模板,总有一款适合您

分享87个Html企业模板,总有一款适合您 87个Html企业模板下载链接:https://pan.baidu.com/s/1iBpfaBRgMJBv4pj07rZhOg?pwd8888 提取码:8888 Python采集代码下载链接:采集代码.zip - 蓝奏云 学习知识费力气,收集…

【STM32】Keil RTE使用记录

0 前言 最近因为任务需要,再次开始研究STM32,打算过一遍之前记录的笔记,在创建工程模板时,突然发现一个之前被自己忽略的东西,那就是创建项目时会弹出的Run-Time Environment,抱着好奇的心态去找了一些资料…

Rust: reqwest库示例

一、异步处理单任务 1、cargo.toml [dependencies] tokio { version "1.0.0", features ["full", "tracing"] } tokio-util { version "0.7.0", features ["full"] } tokio-stream { version "0.1" }…

【学习笔记】数据结构与算法03:栈与队列

知识出处:Hello算法:https://www.hello-algo.com/. 文章目录 2.2 栈和队列2.2.1 「栈 stack」2.2.1.1 栈的常用操作2.2.1.2 栈的典型应用 2.2.2「队列 queue」2.2.2.1 队列的常用操作2.2.2.2 队列的典型应用 2.2.3 双向队列 「double-ended queue」2.2.3…

论文阅读——SimpleClick

SimpleClick: Interactive Image Segmentation with Simple Vision Transformers 模型直接在VIT上增加交互是分割 用VIT MAE方法训练的预训练权重 用交互式分割方法微调,微调流程: 1、在当前分割自动模拟点击,没有人为提供的点击 受到RITM启发…

如何使用Douglas-042为威胁搜索和事件应急响应提速

关于Douglas-042 Douglas-042是一款功能强大的PowerShell脚本,该脚本可以提升数据分类的速度,并辅助广大研究人员迅速从取证数据中筛选和提取出关键数据。 该工具能够搜索和识别Windows生态系统中潜在的安全漏洞,Douglas-042会将注意力放在…

Jmeter基础(2) 目录介绍

目录 Jmeter目录介绍bin目录docsextrasliblicensesprintable_docs Jmeter目录介绍 在学习Jmeter之前,需要先对工具的目录有些了解,也会方便后续的学习 bin目录 examplesCSV目录中有CSV样例jmeter.batwindow 启动文件jmeter.shMac/linux的启动文件jmete…

学习负载均衡的算法

什么负载均衡 负载均衡是一种计算机技术,用于在多个系统、网络链接、硬盘驱动器、CPU等之间分配工作负载,以优化资源使用、最大化吞吐量、最小化响应时间、并避免任何单一资源的过载。在网络负载均衡的情况下,它可以帮助将网络流量有效地分配…