Spark累加器(Accumulator)

1.累加器类型:

  • 数值累加器:用于计算总和、计数等。
  • 布尔累加器:用于计算满足特定条件的次数。
  • 自定义累加器:允许定义复杂的聚合逻辑和数据结构。
  • 集合累加器:用于计算唯一元素的数量,处理去重操作。

    在 Spark 中,累加器(Accumulators)是一种可以用来在任务执行过程中进行累积的变量。它们主要用于计算全局的汇总值,如计数或求和。累加器是只加的变量(即只进行累加操作),并且是分布式的,适合于在多节点环境中进行汇总。

2.示例:

2.1(数值累加器):假设我们有一个包含整数的 RDD,我们希望计算这些整数的总和,并使用累加器来进行累积。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------File Name:     1.测试累加器date:          2024/7/30
-------------------------------------------------
PRODUCT:PyCharm
-------------------------------------------------
"""
from pyspark import SparkContext# 初始化 SparkContext
sc = SparkContext("local[*]", "测试累加器")
# 创建累加器
accumulator = sc.accumulator(0)# 定义一个函数来增加累加器的值
def add_to_accumulator(x):global accumulatoraccumulator.add(x)# 创建一个 RDD
rdd = sc.parallelize([1, 2, 3, 4])# 使用 map 来应用函数,并累加值
rdd.foreach(lambda x: add_to_accumulator(x))# 由于累加器的值在行动操作之后才会被更新,所以需要使用行动操作触发计算
rdd.count()  # 触发计算# 打印累加器的值
print("Accumulated value:", accumulator.value)

2.2(自定义累加器):自定义累加器允许你定义自己的累加逻辑和数据结构。这些累加器可以包含复杂的聚合操作和自定义数据结构。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------File Name:     4.自定义累加器测试date:          2024/7/30
-------------------------------------------------
PRODUCT:PyCharm
-------------------------------------------------
"""
from pyspark import SparkContext
from pyspark.accumulators import AccumulatorParamsc = SparkContext("local[*]", "自定义累加器测试")# 自定义累加器类
class ListAccumulatorParam(AccumulatorParam):def zero(self, value):return []def addInPlace(self, acc1, acc2):return acc1 + acc2list_accumulator = sc.accumulator([], ListAccumulatorParam())def add_to_list_accumulator(x):global list_accumulatorlist_accumulator.add([x])return xrdd = sc.parallelize([1, 2, 3, 4])
rdd.foreach(lambda x: add_to_list_accumulator(x))rdd.count()  # 触发计算print("Accumulated list:", list_accumulator.value)

解释
  • 自定义累加器类ListAccumulatorParam 定义了一个自定义累加器,zero 方法返回一个空列表,addInPlace 方法合并两个列表。
  • 创建自定义累加器list_accumulator = sc.accumulator([], ListAccumulatorParam()) 创建了一个自定义的累加器实例。
  • 更新累加器add_to_list_accumulator(x) 函数将每个元素作为列表加到累加器中。
  • 应用函数rdd.foreach(lambda x: add_to_list_accumulator(x))add_to_list_accumulator 函数应用到 RDD 的每个元素。
  • 触发计算rdd.count() 触发了 RDD 的计算,更新累加器的值。
  • 查看结果list_accumulator.value 获取累加器的最终值,即累加的列表。
  • RDD 中的每个元素 [1, 2, 3, 4] 被转换为单元素列表 [1], [2], [3], [4],并分别添加到累加器中。
  • 累加器的 addInPlace 方法将这些列表合并成一个完整的列表。
2.3(集合累加器):集合累加器用于跟踪独特的元素集合,例如计算唯一元素的数量。它可以用于去重操作。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------File Name:     3.集合累加器测试date:          2024/7/30
-------------------------------------------------
PRODUCT:PyCharm
-------------------------------------------------
"""
from pyspark import SparkContext
from pyspark.accumulators import AccumulatorParamsc = SparkContext("local[*]", "集合累加器测试")# 自定义集合累加器类
class SetAccumulatorParam(AccumulatorParam):def zero(self, value):return set()def addInPlace(self, acc1, acc2):return acc1.union(acc2)set_accumulator = sc.accumulator(set(), SetAccumulatorParam())def add_to_set_accumulator(x):global set_accumulatorset_accumulator.add(set([x]))return xrdd = sc.parallelize([1, 2, 2, 3, 4, 4])
rdd.foreach(lambda x: add_to_set_accumulator(x))rdd.count()  # 触发计算print("Unique elements:", len(set_accumulator.value))
解释
  • 自定义累加器类SetAccumulatorParam 定义了一个自定义累加器,zero 方法返回一个空集合,addInPlace 方法合并两个集合。
  • 创建自定义累加器set_accumulator = sc.accumulator(set(), SetAccumulatorParam()) 创建了一个自定义的累加器实例。
  • 更新累加器add_to_set_accumulator(x) 函数将每个元素作为集合添加到累加器中。
  • 应用函数rdd.foreach(lambda x: add_to_set_accumulator(x))add_to_set_accumulator 函数应用到 RDD 的每个元素。
  • 触发计算rdd.count() 触发了 RDD 的计算,更新累加器的值。
  • 查看结果len(set_accumulator.value) 获取累加器的最终值,即唯一元素的数量。
  • RDD 中的元素 [1, 2, 2, 3, 4, 4] 被转换为集合形式,分别是 {1}, {2}, {2}, {3}, {4}, {4}
  • 每个元素的集合被添加到累加器中。由于累加器的合并逻辑是集合的并集,最终的累加器会包含所有唯一的元素,所以最后的计算结果是4。

3.累加器的特点:

  • 只加操作:累加器只能执行加操作,不能进行减操作或其他类型的操作。
  • 分布式支持:累加器在多节点环境下是分布式的,每个 Executor 都会在其本地更新累加器的值。最后,这些本地值会在 Driver 节点上进行合并。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3280418.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Study--Oracle-07-ASM常用操作(五)

一、向磁盘组添加磁盘 1、查看系统中可用的磁盘 set lines 150; col name for a35; col path for a35; select group_number,path, state, name, total_mb, free_mb from v$asm_disk; 2、磁盘组操作 创建磁盘组 create DISKGROUP DATADGV2 EXTERNAL REDUNDANCY DISK /dev/…

解决Qt3D程序场景中无法显示创建的立体图形?

有的新手在创建Qt3D程序时,因为不熟练,导致经常遇到无法显示3D图形的情况。 原因其实也简单,就是设置的摄像机的位置不对,或者压根没有设置摄像机。 // CameraQt3DRender::QCamera *cameraEntity view.camera();cameraEntity-&g…

Java二十三种设计模式-外观模式(9/23)

外观模式:简化复杂系统的统一接口 引言 外观模式(Facade Pattern)是一种结构型设计模式,它为子系统中的一组接口提供一个统一的高层接口。外观模式定义了一个可以与复杂子系统交互的简化接口,使得子系统更加易于使用…

Android 10.0 Launcher3仿ios的folder文件夹widget功能实现二

1.前言 在10.0的系统ROM开发中,在进行一些系统Launcher3定制功能开发中,需要实现folder文件夹widget的功能,由于launcher3 默认不支持folder跨行显示,所以就需要借助自定义的widget小部件功能来实现相关功能,接下来分析实现相关功能 2.Launcher3仿ios的folder文件夹widge…

jQuery前端网页制作

1、Jquery的概述 1.1JavaScript库 JavaScript 高级程序设计(特别是对浏览器差异的复杂处理),通常很困难也很耗时。 为了应对这些调整,许多的 JavaScript (helper) 库应运而生。 这些 JavaScript 库常被称为 JavaScript 框架。 市面上一些广受欢迎的 JavaScript 框架:…

大厂linux面试题攻略五之数据库管理

一、数据库管理-MySQL语句 0.MySQL基本语句: 1.SQL语句-增 创建xxx用户: mysql>create user xxx % indentified by 123456; xxx表示用户名 %b表示该用户用来连接数据库的方式(远程或本地连接) indentified by 123456设置密码…

Reranker技术

文章目录 Reranker技术0. 什么是RAG1. 什么是Reranker?2. Reranker在RAG技术中的应用3.使用 Reranker 的优缺点4.总结参考:知乎 Reranker技术 0. 什么是RAG 基础 RAG 的操作流程大致如下:首先,你需要将文本切分成小段&#xff0…

centos7 docker空间不足

今天在使用docker安装镜像的时候,出现报错 查看原因,发现是分区空间不足导致的 所以考虑进行扩容 首先在vmware扩容并没有生效 因为只是扩展的虚拟空间,并不支持扩展分区大小,下面对分区进行扩容 参考: 分区扩容 主…

细说MCU的DAC改变输出信号频率的方法

目录 一、参考硬件 二、改变输出信号的频率 1.建立新工程 2.配置TIM3 三、代码修改 四、查看结果 一、参考硬件 本项目依赖的软件和硬件工程参考本文作者写的文章:细说MCU的DAC1和DAC2各自输出一通道模拟信号的方法-CSDN博客 https://wenchm.blog.csdn.net/a…

【初阶数据结构篇】二叉树算法题

文章目录 二叉树算法题前言单值二叉树相同的树对称二叉树另一棵树的子树二叉树的前序遍历 二叉树算法题 前言 本篇的算法题涉及到链式结构二叉树的实现方法可参考:二叉链实现方法上篇二叉链实现方法下篇 单值二叉树 如果二叉树每个节点都具有相同的值,…

什么情况?我代码没了

前两天检视代码时,发现PR里面有两个提交的描述信息一模一样,于是我提出应该将这两个提交合并成一个,保持提交树的清晰。 1 先储存起来! 而同事这时正在开发别的特性,工作区不是干净的,没法直接执行 git r…

APDL(ANSYS Parametric Design Language)初识

APDL(ANSYS Parametric Design Language)编写涉及使用ANSYS的参数化设计语言来创建、修改和执行有限元分析(FEA)任务。以下是一些关于APDL编写的基本步骤、技巧和示例: 一、基本步骤 了解APDL基础: 熟悉AP…

如何在 Kali Linux 上安装和使用 Docker 和 Docker Compose

Docker 和 Docker Compose 是现代开发者必备的工具,特别是当你需要在不同的环境中部署应用时。本文将详细介绍如何在 Kali Linux 上安装 Docker 和 Docker Compose,并使用它们启动服务。即使你是个技术小白,也能轻松跟随这篇指南完成操作。 …

轻松搞定 Nginx 在 CentOS 和 Ubuntu 上的安装与配置

注:这是对我以前博客进行优化后再次发布的,博客中的截图为以前的。原博客已删除。 如何安装nginx nginx是一款开源、高性能的Web和反向代理服务器,支持HTTP、HTTPS、SMTP、POP3和IMAP协议。由于其轻量级、资源占用少和强大的并发能力&#…

基于vue2 + Ant Design 封装input(输入)下拉Table表格

封装 AInputTable 组件 <!--下拉Table--> <template><div class"input-select-table" ref"inputTableRef" v-clickoutside"handleHide"><div class"input-select-table-input" click"disabled?this:hand…

【C++] 认识C++(二)

前言 &#x1f4da;作者简介&#xff1a;爱编程的小马&#xff0c;是一名大厂后端c程序员。 &#x1f4da;本文收录于C系列&#xff0c;本专栏主要是分享我所了解的c知识&#xff0c;带领大家慢慢从了解c到认识c&#xff0c;持续更新&#xff01; &#x1f4da;相关专栏Linux正…

Base64解码时Illegal base64 character 20问题解决

一&#xff0c;问题 在使用Base64解码的时候 // 这里的keyContent是公钥&#xff0c;一般配置到配置中心里&#xff0c;然后注入到容器里 String publicKeyString keyContent .replaceAll("\\n", "") .replace("-----BEGIN PUBLIC KEY-----",…