【Python】PySpark

前言

Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。

简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

Spark对Python语言的支持,重点体现在Python第三方库:PySpark

PySpark是由Spark官方开发的Python语言第三方库。

Python开发者可以使用pip程序快速的安装PySpark并像其它第三方库那样直接使用。

在这里插入图片描述

基础准备

安装

同其它的Python第三方库一样,PySpark同样可以使用pip程序进行安装。

pip install pyspark或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

构建PySpark执行环境入口对象

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。

PySpark的执行环境入口对象是:类SparkContext的类对象

# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)# 打印PySpark的运行版本
print(sc.version)# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

运行需要Java环境,推荐jdk8

PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。

PySpark的编程,主要分为如下三大步骤:

在这里插入图片描述

数据输入

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

在这里插入图片描述

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将list/tuple/set/dict/str转换为PySpark的RDD对象

# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1, 2, 3])    
rdd2 = sc.parallelize((1, 2, 3))    
rdd3 = sc.parallelize({1, 2, 3})    
rdd4 = sc.parallelize({'key1': 'value1', 'key2': 'value2'}) 
rdd5 = sc.parallelize('hello')  # 输出RDD的内容,需要使用collect()
print(rdd1.collect())   # [1, 2, 3]
print(rdd2.collect())   # [1, 2, 3]
print(rdd3.collect())   # [1, 2, 3]
print(rdd4.collect())   # ['key1', 'key2']
print(rdd5.collect())   # ['h', 'e', 'l', 'l', 'o']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

注意:

  • 字符串会被拆分出一个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象

读取文件转RDD对象

PySpark也支持通过SparkContext入口对象来读取文件,构建出RDD对象。

先提前预备一个txt文件

hello
python
day
# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.textFile('E:\\code\\py-space\\8.27\\hello.txt')# 输出RDD的内容,需要使用collect()
print(rdd.collect())    # ['hello', 'python', 'day']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据计算

RDD对象内置丰富的:成员方法(算子)

map算子

将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD

rdd.map(func)
# func: f:(T) -> U
# f: 表示这是一个函数
# (T) -> U 表示的是方法的定义:()表示无需传入参数,(T)表示传入1个参数
# T是泛型的代称,在这里表示 任意类型
# U是泛型的代称,在这里表示 任意类型# (T) -> U : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型不限
# (A) -> A : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型和传入参数类型一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])# 通过map方法将全部数据乘以10,传入参数为函数
rdd2 = rdd.map(lambda x: x * 10)# 输出RDD的内容,需要使用collect()
print(rdd2.collect())   # [10, 20, 30, 40, 50, 60]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

由于map()的返回值还是RDD对象,可以继续在尾部进行链式调用

rdd3 = rdd.map(lambda x: x * 10).map(lambda x: x + 9)

flatMap算子

对RDD执行map操作,然后进行解除嵌套操作。

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(['a b c', 'd e f'])# 输出RDD的内容,需要使用collect()
print(rdd.map(lambda x: x.split(' ')).collect())    # [['a', 'b', 'c'], ['d', 'e', 'f']]
print(rdd.flatMap(lambda x:x.split(' ')).collect())   # ['a', 'b', 'c', 'd', 'e', 'f']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey算子

针对KV型(二元元组)RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

rdd.reduceByKey(func)
# func: (V, V) -> V
# 接收2个传入参数(类型要一致),返回一个返回值,返回值类型和传入参数类型要求一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])# 输出RDD的内容,需要使用collect()
print(rdd.reduceByKey(lambda a, b: a+b).collect())  # [('b', 3), ('a', 2)]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey中的聚合逻辑是:比如有[1,2,3,4,5],然后聚合函数是:lambda a,b: a+b

在这里插入图片描述

注意:reduceByKey中接收的函数,只负责聚合,不理会分组;分组是自动by key来分组的

filter算子

过滤想要的数据进行保留。

rdd.filter(func)
# func: (T) -> bool
# 传入一个参数任意类型,返回值必须是True/False,返回是True的数据被保留,False的数据被丢弃

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])# 输出RDD的内容,需要使用collect()
print(rdd.filter(lambda x: x % 2 == 0).collect())  # [2, 4, 6]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

distinct算子

对RDD数据进行去重,返回新的RDD

rdd.distinct() # 无需传参

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 3, 2, 6])# 输出RDD的内容,需要使用collect()
print(rdd.distinct().collect())  # [6, 1, 2, 3]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

sortBy算子

对RDD数据进行排序,基于你指定的排序依据。

rdd.sortKey(func, ascending=False, numPartitions=1)
# func: (T) -> U:告知按照RDD中的哪个数据进行排序,比如lambda x: x[1]表示按照RDD中的第二列元素进行排序
# ascending:True升序,False降序
# numPartitions:用多少分区排序,全局排序需要设置为1

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('Aiw', 9), ('Tom', 6), ('Jack', 8), ('Bolb', 5)])# 输出RDD的内容,需要使用collect()
print(rdd.sortBy(lambda x: x[1], ascending=False,numPartitions=1).collect())  # [('Aiw', 9), ('Jack', 8), ('Tom', 6), ('Bolb', 5)]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据输出

collect算子

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。

rdd.collect()
# 返回值是一个List

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3])rdd_list: list = rdd.collect()print(rdd_list)   # [1, 2, 3]
print(type(rdd_list))   # <class 'list'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduce算子

对RDD数据集按照你传入的逻辑进行聚合

rdd.reduce(func)
# func:(T, T) -> T
# 传入2个参数,1个返回值,要求返回值和参数类型一致

在这里插入图片描述

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))print(rdd.reduce(lambda a, b: a+b))   # 45# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

take算子

取RDD的前N个元素,组合成List进行返回。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))rdd_take: list = rdd.take(3)print(rdd_take)   # [1, 2, 3]
print(type(rdd_take))   # <class 'list'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

count算子

计算RDD有多少条数据,返回值是一个数字。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))rdd_count: int = rdd.count()print(rdd_count)   # 9
print(type(rdd_count))   # <class 'int'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

saveAsTextFile算子

将RDD的数据写入文本文件中。支持本地写出、HDFS等文件系统。

注意事项:

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'
os.environ['HADOOP_HOME'] = 'D:/Hadoop-3.0.0'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))rdd.saveAsTextFile('./8.27/output') # 运行之前确保输出文件夹不存在,否则报错# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

上述代码输出结果,输出文件夹内有多个分区文件

修改RDD分区为1个

方式一:SparkConf对象设置属性全局并行度为1:

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')
# 设置属性全局并行度为1
conf.set('spark.default.parallelism','1')
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

方式二:创建RDD的时候设置(parallelize方法传入numSlices参数为1)

rdd = sc.parallelize(range(1, 10), numSlices=1)
rdd = sc.parallelize(range(1, 10), 1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/1620313.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Xposed常用逆向函数

1. 创建Xposed工程 在Android Studio中新建一个app工程&#xff0c;修改其中的 AndroidManifest.xml 文件&#xff0c;在<application></application>标签中增加如下代码 <meta-dataandroid:name"xposedmodule"android:value"true" />…

Xposed环境安装

一、Xposed 框架实现 Hook 的原理介绍 Zygote是Android的核心&#xff0c;每运行一个app&#xff0c;Zygote就会fork一个虚拟机实例来运行app&#xff0c; Xposed Framework深入到了Android核心机制中&#xff0c;通过改造Zygote来实现一些很牛逼的 功能。Zygote的启动配置在i…

Xposed 使用教程

Xposed作为Android开发中的神器&#xff0c;功能强大之处就不做过多介绍了&#xff0c;本文主要讲解一些常用的API&#xff0c;基本包含常用的Hook操作。 Hook静态变量 Class cla XposedHelpers.findClass(claName, loadPackageParam.classLoader); XposedHelpers.setStatic…

xposed android 4.4.2,xposed新版54

xposed新版54是一款好用的系统工具&#xff0c;软件安全&#xff0c;无需root权限&#xff0c;下载相对应功能板块即可在应用内实现应用多开、虚拟步数、以及qq&#xff0c;微信等多种功能&#xff0c;方便又实用&#xff01; 软件介绍 系统功能增强&#xff0c;如后台管制&…

Xposed安装

记录一下自己安装xposed的过程。网上很多xposed的安装教程&#xff0c;里面各种都是直接跳转到官网地址下载Xposed&#xff0c;但国内打不开&#xff0c;提示如下&#xff1a; 因此只能下载对应版本zip包进行本地安装&#xff0c;下载对应zip包放到“ /sdcard/Android/data/de…

android8 检测xposed,Xposed检测与自定义Xposed

Xposed检测与自定义Xposed 前言: Xposed检测 1、遍历App安装列表检测 2、自造异常检测堆栈信息。 3、检查关键Java方法是否变为native方法 4、反射XposedHelper类和XposedBridge类 5、检测Xposed相关文件 6、Root检测 7、安全建议 自定义Xposed 一、修改XposedBridge.jar包名 …

xposed android4.4,应用管理Xposed

应用管理Xposed是一款安卓应用管理xposed模块&#xff0c;可以帮助你更好的管理自己手机的各种应用的权限&#xff0c;应用使用需要先阅读了解一下使用的方法&#xff0c;非常强大的一款插件&#xff0c;欢迎大家前来下载。 新版特性 1. 为部分列表也添加基本筛选。 2. 在主页显…

android xposed 简书,Xposed 入坑篇

device-2018-04-12-101001.png 接下来开始敲代码了 美滋滋(皮一下很开心) 上一张整个工程的图 QQ截图20180412102021.png 以下是Test和Tutorial的代码 package com.zed.xposed.demo; import de.robv.android.xposed.IXposedHookLoadPackage; import de.robv.android.xposed.Xpo…

Xposed 入门

Xposed的原理 Android基于Linux&#xff0c;第一个启动的进程自然是init进程&#xff0c;该进程会 启动所有Android进程的父进程——Zygote(孵化)进程&#xff0c;该进程的启动配置在 /init.rc脚本中&#xff0c;而Zygote进程对应的执行文件是/system/bin/app_process&#xf…

Xposed

[Xposed framework] (概述) 记录学习文章&#xff0c;这里介绍一下Xposed framework&#xff0c;它可以让你修改ROM&#xff0c;无需修改任何APK或flashing.快速入门&#xff0c;请看Xposed上XDA 之android basic 101&#xff1a; http://www.youtube.com/watch?vuRR0Flqx9M8 …

Xposed安装与使用

xposed是什么&#xff1f; 一个很牛逼的框架&#xff0c;可以在不修改APK的情况下影响程序的运行&#xff0c;比如&#xff1a; 直接把APP的界面改成自己想要的样去掉界面里不喜欢的东西&#xff0c;自动抢红包消息防撤回步数修改等等 Xposed的工作原理 在开始修改之前&…

【Android】Xposed 框架解析

前言 Xposed这位老兄大家可能不认识&#xff0c;微信自动抢红包大家听过吧、微信记录器作弊大家听过吧、地理位置模拟大家听过吧&#xff0c;我很负责任的告诉大家&#xff0c;这些都是Xposed干的&#xff0c;对的&#xff0c;就是它&#xff0c;相信大家充着“谁抢我红包”的…

Android之Xposed框架完全使用指南

文章目录 Xposed环境搭建Xposed简介Xposed原理Xposed的安装 Xposed插件开发Xposed插件编写流程Xposed开发之Hook构造函数相关API无参构造函数的hook有参构造函数的hook实际效果 Xposed开发之修改属性相关API修改静态字段和成员字段实际效果 Xposed开发之hook一般函数相关APIhoo…

深入理解Android(三):Xposed详解

编者按&#xff1a;随着移动设备硬件能力的提升&#xff0c;Android系统开放的特质开始显现&#xff0c;各种开发的奇技淫巧、黑科技不断涌现&#xff0c;InfoQ特联合《深入理解Android》系列图书作者邓凡平&#xff0c;开设深入理解Android专栏&#xff0c;探索Android从框架到…

Xposed入门教程

2019年8月27日16时51分47秒以前一直没机会接触Android Hook方式的逆向今天有空试了下&#xff0c;以前也很少写这种东西&#xff0c;今天第一次&#xff0c;认真写下&#xff0c;记录一下?准备 准备搞太极的&#xff0c;但是Xposed都不会&#xff0c;不好搞&#xff0c;所以就…

RabbitMQ---订阅模型-Topic

订阅模型-Topic • Topic类型的Exchange与Direct相比&#xff0c;都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符&#xff01; • Routingkey 一般都是有一个或多个单词组成&#xff0c;多个单词之间以…

PostgreSQL命令行工具psql常用命令

1. 概述 通常情况下操作数据库使用图形化客户端工具&#xff0c;在实际工作中&#xff0c;生产环境是不允许直接连接数据库主机&#xff0c;只能在跳板机上登录到Linux服务器才能连接数据库服务器&#xff0c;此时就需要使用到命令行工具。psql是PostgreSQL中的一个命令行交互…

pyreverse+Graphviz 快速理清整个项目中的代码结构

作用 &#xff1a; 分析代码中的调用关系&#xff0c;帮助快速理清代码。 安装方法&#xff1a;以windows为例 • 从官网下载 https://www.graphviz.org/download/ 安装• 记得将其添加到系统路径• 运行下面命令来检查安装是否完成&#xff1a; dot -V• 安装Pyreverse&…

配置web服务

Web服务器又称为WWW服务器&#xff0c;它是放置一般网站的服务器。一台Web服务器上可以建立多个网站&#xff0c;各网站的拥有者只需要把做好的网页和相关文件放置在Web服务器的网站中&#xff0c;其它用户就可以用浏览器访问网站中的网页了。 LAMP是Linux, Apache, MySQL, PH…

Web Service(Web服务)

什么是webservice&#xff1f; 一句话概括&#xff1a;WebService是一种跨编程语言和跨操作系统平台的远程调用技术。 所谓跨编程语言和跨操作平台&#xff0c;就是说服务端程序采用Java编写&#xff0c;客户端程序则可以采用其他编程语言编写&#xff0c;反之亦然&#xff01…