SparkSQL学习03-数据读取与存储

文章目录

    • 1 数据的加载
      • 1.1 方式一:spark.read.format
        • 1.1.1读取json数据
        • 1.1.2 读取jdbc数据
      • 1.2 方式二:spark.read.xxx
        • 1.2.1 读取json数据
        • 1.2.2 读取csv数据
        • 1.2.3 读取txt数据
        • 1.2.4 读取parquet数据
        • 1.2.5 读取orc数据
        • 1.2.6 读取jdbc数据
    • 2 数据的保存
      • 2.1 方式一:spark.write.format
        • 2.1.1 读取orc数据
      • 2.2 方式二:spark.write.xxx
        • 2.2.1 写入到jdbc数据库中

SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不用格式的数据。SparkSQL默认读取和保存的文件格式为parquet,parquet是一种能够有效存储嵌套数据的列式存储格式。

1 数据的加载

SparkSQL提供了两种方式可以加载数据

1.1 方式一:spark.read.format

  • spark.read.format读取数据文件格式.load加载数据路径”
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 需要注意:在读取jdbc时需要在format和load之间添加多个option进行相应的JDBC参数设置【url、user、password.tablename】load中不用传递路经空参数即可
  • 数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format
1.1.1读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()//使用第一种范式加载数据var frame: DataFrame = session.read.format("json").load("data/people.json")frame.printSchema()/*** 运行结果:root|-- age: long (nullable = true)|-- height: double (nullable = true)|-- name: string (nullable = true)|-- province: string (nullable = true)*/frame.show()/*** 运行结果:+---+------+-------+--------+|age|height|   name|province|+---+------+-------+--------+| 10| 168.8|Michael|    广东|| 30| 168.8|   Andy|    福建|| 19| 169.8| Justin|    浙江|| 32| 188.8| 王启峰|    广东|| 10| 168.8|   John|    河南|| 19| 179.8|   Domu|    浙江|+---+------+-------+--------+* */}
}
1.1.2 读取jdbc数据

读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()// 如果读取的JDBC操作(即读取mysql中的数据)val frame = session.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/mydb1").option("dbtable","location_info").option("user","root").option("password","123456").load()frame.printSchema()}
}

1.2 方式二:spark.read.xxx

  • 上述的书写方式太过项,所以SparksQL推出了更加便捷的方式spark.read.xxx加载数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在读取jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
1.2.1 读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()//【推荐使用】第二种方式进行读取操作val frame = session.read.json("data/people.json")frame.printSchema()/**root|-- age: long (nullable = true)|-- height: double (nullable = true)|-- name: string (nullable = true)|-- province: string (nullable = true)*/frame.show()/**+---+------+-------+--------+|age|height|   name|province|+---+------+-------+--------+| 10| 168.8|Michael|    广东|| 30| 168.8|   Andy|    福建|| 19| 169.8| Justin|    浙江|| 32| 188.8| 王启峰|    广东|| 10| 168.8|   John|    河南|| 19| 179.8|   Domu|    浙江|+---+------+-------+--------+ */}}
1.2.2 读取csv数据

csv数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.csv("data/country.csv")frame.printSchema()/**root|-- _c0: string (nullable = true)|-- _c1: string (nullable = true)|-- _c2: string (nullable = true)*/frame.show()/**+---+----------------+---+|_c0|             _c1|_c2|+---+----------------+---+|  1|            中国|  1||  2|      阿尔巴尼亚|ALB||  3|      阿尔及利亚|DZA||  4|          阿富汗|AFG||  5|          阿根廷|ARG||  6|阿拉伯联合酋长国|ARE||  7|          阿鲁巴|ABW||  8|            阿曼|OMN||  9|        阿塞拜疆|AZE|| 10|        阿森松岛|ASC|| 11|            埃及|EGY|| 12|      埃塞俄比亚|ETH|| 13|          爱尔兰|IRL|| 14|        爱沙尼亚|EST|| 15|          安道尔|AND|| 16|          安哥拉|AGO|| 17|          安圭拉|AIA|| 18|安提瓜岛和巴布达|ATG|| 19|        澳大利亚|AUS|| 20|          奥地利|AUT|+---+----------------+---+*/}}
1.2.3 读取txt数据

txt数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.text("data/dailykey.txt")frame.printSchema()/**root|-- value: string (nullable = true)* */frame.show()/**+--------------------+|               value|+--------------------+|2018-11-13\ttom\t...||2018-11-13\ttom\t...||2018-11-13\tjohn\...||2018-11-13\tlucy\...||2018-11-13\tlucy\...||2018-11-13\tjohn\...||2018-11-13\tricha...||2018-11-13\tricha...||2018-11-13\tricha...||2018-11-14\ttom\t...||2018-11-14\ttom\t...||2018-11-14\ttom\t...|+--------------------+* */}}
1.2.4 读取parquet数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.parquet("data/users.parquet")frame.printSchema()/**root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)|    |-- element: integer (containsNull = true)*/frame.show()/*+------+--------------+----------------+|  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|          null|  [3, 9, 15, 20]||   Ben|           red|              []|+------+--------------+----------------+*/}
}
1.2.5 读取orc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.orc("data/student.orc")frame.printSchema()/**root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- age: string (nullable = true)|-- gender: string (nullable = true)|-- course: string (nullable = true)|-- score: string (nullable = true)*/frame.show()/**+---+------+---+------+-------+-----+| id|  name|age|gender| course|score|+---+------+---+------+-------+-----+| 12|  张三| 25|    男|chinese|   50|| 12|  张三| 25|    男|   math|   60|| 12|  张三| 25|    男|english|   70|| 12|  李四| 20|    男|chinese|   50|| 12|  李四| 20|    男|   math|   50|| 12|  李四| 20|    男|english|   50|| 12|  王芳| 19|    女|chinese|   70|| 12|  王芳| 19|    女|   math|   70|| 12|  王芳| 19|    女|english|   70|| 13|张大三| 25|    男|chinese|   60|| 13|张大三| 25|    男|   math|   60|| 13|张大三| 25|    男|english|   70|| 13|李大四| 20|    男|chinese|   50|| 13|李大四| 20|    男|   math|   60|| 13|李大四| 20|    男|english|   50|| 13|王小芳| 19|    女|chinese|   70|| 13|王小芳| 19|    女|   math|   80|| 13|王小芳| 19|    女|english|   70|+---+------+---+------+-------+-----+*/}
}
1.2.6 读取jdbc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()// 读取jdbc文件val properties = new Properties()properties.put("user","root")properties.put("password","123456")val frame = session.read.jdbc("jdbc:mysql://localhost:3306/mydb1","location-info",properties)frame.printSchema()frame.show()}
}

2 数据的保存

SparkSQL提供了两种方式可以保存数据

2.1 方式一:spark.write.format

  • spark.write.format(“保存数据格式”).mode(“存储格式”).save(“存储数据路径”)
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 保存数据可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置
  • SaveMode是一个枚举类,其中的常量包括:
scala/javaAny LanguageMeaning
SaveMode.ErrorifExists(default)“error”(default)如果文件已经存在,则抛出异常
SaveMode.Append“append”如果文件已经存在,则追加
SaveMode.Overwrite“overwrite”如果文件已经存在,则覆盖
SaveMode.Ignore“ignore”如果文件已经存在,则忽略

需要注意:在读取jdbc时需要在format和save之间添加多个option进行相应的JDBC参数设置【url、user、password、tablename】save中不用传递路经空参数即可,可以不用设置mode

数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format

2.1.1 读取orc数据
package _02SparkSQL
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData {def main(args: Array[String]): Unit = {//提供SparkSession对象val session = SparkSession.builder().appName("SparkWriteData").master("local").getOrCreate()//先读取数据var frame: DataFrame = session.read.orc("data/student.orc")//保存到某个路径下,OWstudent为文件夹,不需要文件名frame.write.format("json").mode(SaveMode.Overwrite).save("data/OWstudent")session.stop()}
}

最后结果为:
在这里插入图片描述

2.2 方式二:spark.write.xxx

上述的书写方式太过繁项,所以SparksQL推出了更加便捷的方式:

  • spark.write.xxx(“保存数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在保存jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
  • mode可以选择性设置
2.2.1 写入到jdbc数据库中
package _02SparkSQL
import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData {def main(args: Array[String]): Unit = {//提供SparkSession对象val session = SparkSession.builder().appName("SparkWriteData").master("local").getOrCreate()//先读取数据var frame: DataFrame = session.read.orc("data/student.orc")val properties = new Properties()properties.put("user","root")properties.put("password","123456")frame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/mydb1","student",properties)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2803092.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

vue使用luckysheet时报错window.luckysheet.destroy is not a function

这里写自定义目录标题 vue使用luckysheet时报错window.luckysheet.destroy is not a function解决办法 vue使用luckysheet时报错window.luckysheet.destroy is not a function 按照教程 luckysheet教程: link 将需要的资源进行本地引入。 本地预览excel正常,但是放…

Tomcat 学习之 Filter 过滤器

目录 1 Filter 介绍 2 Filter 的生命周期 3 Filter 和 FilterChain 4 Filter 拦截过程 5 FilterConfig 6 Filter 使用 1 Filter 介绍 在 Tomcat 中,Filter 是一种用于拦截请求和过滤响应的组件,可以在请求到达 Servlet 之前或响应离开 Servlet 之后…

视频的语音转成文字字幕?这3个方法让你实现

随着网络的普及,越来越多的学生选择在网上观看辅导视频,以便随时随地学习。然而,整理这些视频中的教学笔记却成为了一个让人头疼的问题。传统的边看边记录的方式不仅费时费力,还容易遗漏重要信息。那么,有没有一种方法…

Spring Boot application.properties和application.yml文件的配置

在Spring Boot中,application.properties 和 application.yml 文件用于配置应用程序的各个方面,如服务器端口、数据库连接、日志级别等。这两个文件是Spring Boot的配置文件,位于 src/main/resources 目录下。 application.properties 示例 …

Jmeter基础(1) Mac下载安装启动

目录 Jmeter下载安装启动下载启动 Jmeter下载安装启动 注意⚠️:使用jmeter需要有java环境 下载 官网下载地址:https://jmeter.apache.org/ 会看到这里有两个版本,那么有什么区别么? Binaries是可执行版,直接下载解…

PostgreSQL 的实体化视图介绍

PostgreSQL 实体化视图提供一个强大的机制,通过预先计算并将查询结果集存储为物理表来提高查询性能。本教程将使用 DVD Rental Database 数据库作为演示例子,指导你在 PostgreSQL中创建实体化视图。 了解实体化视图 实体化视图是查询结果集的快照&…

网站访问免费升级成 HTTPS

步骤一:获取SSL证书 要将网站访问改为HTTPS,首先需要获取一个免费SSL证书。SSL证书是由受信任的证书颁发机构(CA)签发的数字证书,用于验证网站的身份并加密与用户的通信。可以通过购买或免费获取SSL证书。一些常见的免…

腾讯云宝塔Linux安装Mysql5.7

一、下载官方mysql包 wget http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.rpm二、安装mysql包 rpm -ivh mysql-community-release-el7-5.noarch.rpm三、安装mysql yum install mysql-community-server -y四、启动数据库 systemctl start mysqld.service…

JNDI注入+RMI流程复现代码调试

前置知识 javax.naming:主要用于命名操作,它包含了命名服务的类和接口,该包定义了Context接口和InitialContext类;javax.naming.directory:主要用于目录操作,它定义了DirContext接口和InitialDir- Context…

使用jconsole监控SpringbootJVM(JDK11)

SpringBoot启动时增加参数: java -Djava.rmi.server.hostname服务器IP -Dcom.sun.management.jmxremotetrue \ -Dcom.sun.management.jmxremote.port1099 \ -Dcom.sun.management.jmxremote.rmi.port1099 \ -Dcom.sun.management.jmxremote.authenticatefalse \ -D…

C 嵌入式系统设计模式 10:中介者模式

本书的原著为:《Design Patterns for Embedded Systems in C ——An Embedded Software Engineering Toolkit 》,讲解的是嵌入式系统设计模式,是一本不可多得的好书。 本系列描述我对书中内容的理解。本文章描述访问硬件的设计模式之三&…

hash,以及数据结构——map容器

1.hash是什么? 定义:hash,一般翻译做散列、杂凑,或音译为哈希,是把任意长度的输入(又叫做预映射pre-image)通过散列算法变换成固定长度的输出, 该输出就是散列值。这种转换是一种压缩映射&…

谷歌浏览器文件下载不了的问题

问题: 谷歌浏览器测试环境a标签下载附件没问题,但是另一个环境下载闪了一下但是没有下载. 原因:测试环境http,附件下载链接是http,但是另一个环境网页地址是https,附件下载链接是http. Chrome将开始阻止“安全页面”(HTTPS)上所有“非安全子资源”&#…

BeikeShop跨境电商PHP商城源码

BeikeShop 一款开源好用的跨境电商系统,BeikeShop 是基于 Laravel 开发的一款开源商城系统主要面向外贸/跨境电商行业提供商品管理、订单管理、会员管理、支付、物流、系统管理等功能。 插件市场支持个人免签 BeikeShop系统亮点 1、系统代码100%开源 2、代码分层…

RabbitMQ开启MQTT协议支持

1)RabbitMQ启用MQTT插件 rootmq:/# rabbitmq-plugins enable rabbitmq_mqtt Enabling plugins on node rabbitmq: rabbitmq_mqtt The following plugins have been configured:rabbitmq_managementrabbitmq_management_agentrabbitmq_mqttrabbitmq_web_dispatch Ap…

Linux理解

VMware安装Linux安装 目录 VMware安装Linux安装 1.1 什么是Linux 1.2 为什么要学Linux 1.3 学完Linux能干什么 2.1 主流操作系统 2.2 Linux系统版本 VMware安装Linux安装 1.1 什么是Linux Linux是一套免费使用和自由传播的操作系统。 1.2 为什么要学Linux 1). 企业用人…

unity Aaimation Rigging使用多个约束导致部分约束失去作用

在应用多个约束时,在Hierarchy的顺序可能会影响最终的效果。例如先应用了Aim Constraint,然后再应用Two Bone Constraint,可能会导致Two Bone Constraint受到Aim Constraint的影响而失效。因此,在使用多个约束时,应该仔…

【Java程序设计】【C00293】基于Springboot的藏区特产销售平台(有论文)

基于Springboot的藏区特产销售平台(有论文) 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的藏区特产销售平台 本系统分为系统功能模块以及管理员功能模块。 系统功能模块:进入藏区特产销售平台页面中可以查看…

【Git】:远程仓库操作

远程仓库操作 一.理解版本控制系统二.远程仓库1.克隆2.Push操作3.fetch操作4. .gitnore文件 一.理解版本控制系统 我们⽬前所说的所有内容(⼯作区,暂存区,版本库等等),都是在本地!也就是在你的笔记本或者计…

前端输入框校验限制不能输入中文

一般我们在做表单的时候都会有表单校验,通常都是用element提供的表单验证的功能,只需要通过 rules 属性传入约定的验证规则,如下面这样 rules: {userName: [{validator: checkUsername,trigger: "blur",},{ validator: this.checkData, trigge…