Spark实时(五):InputSource数据源案例演示

文章目录

InputSource数据源案例演示

一、​​​​​​​File Source

1、读取text文件

2、读取csv文件

3、读取json文件

二、Socket Source 

三、Rate Source


InputSource数据源案例演示

在Spark2.0版本之后,DataFrame和Dataset可以表示静态有边界的数据,也可以表示无边界的流式数据。在Structured Streaming中我们可以使用SparkSession针对流式数据源创建对应的Dataset或者DataFrame,并可以像处理批数据一样使用各种Operators操作处理流式数据。

Structured Streaming的数据源目前支持File Source 、Socket Source 、Rate Source、Kafka Source ,与Kafka的整合在后续整理,这里对其他三种数据源分别演示。

一、​​​​​​​​​​​​​​File Source

Sturctured Streaming可以读取写入目录的文件作为数据流,文件将按照文件修改时间的顺序进行处理,文件必须原子性的存入到监控目录中,支持的格式有text、csv、json、orc、parquet。

1、读取text文件

Scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/***  Structured Streaming监控目录 text格式数据*/
object SSReadTextData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadTextData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import  spark.implicits._spark.sparkContext.setLogLevel("Error")//2.监控目录val ds: Dataset[String] = spark.readStream.textFile("./data/")val result: DataFrame = ds.map(line => {val arr: Array[String] = line.split("-")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}

 结果:

Java代码如下:

package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class SSReadTextData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<String> ds = spark.readStream().textFile("./data/");Dataset<Tuple3<Integer, String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {@Overridepublic Tuple3<Integer, String, Integer> call(String line) throws Exception {String[] arr = line.split("-");return new Tuple3<>(Integer.valueOf(arr[0]), arr[1],Integer.valueOf(arr[2]) );}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()));Dataset<Row> result = ds2.toDF("id", "name", "age");result.writeStream().format("console").start().awaitTermination();}
}

 结果:

以上代码编写完成之后,向监控的目录“./data”中不断写入含有以下内容的文件,可以看到控制台有对应的流数据输出,这里一定是原子性的将文件复制到对应目录下。文件内容如下:

1-zhangsan-18
2-lisi-19
3-ww-20

2、读取csv文件

Scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType/*** Structured Streaming 读取CSV数据*/
object SSReadCsvData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import  spark.implicits._spark.sparkContext.setLogLevel("Error")//2.创建CSV数据schemaval userSchema: StructType = new StructType().add("id", "integer").add("name", "string").add("gender", "string").add("age", "integer")val result: DataFrame = spark.readStream.option("sep", ",").schema(userSchema).csv("./data/")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;/*** Structured Streaming 读取CSV数据*/public class SSReadCsvData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");StructType userSchema = new StructType().add("id", "integer").add("name", "string").add("gender", "string").add("age", "integer");Dataset<Row> result = spark.readStream().option("sep", ",").schema(userSchema).csv("./data/");result.writeStream().format("console").start().awaitTermination();}
}

 结果:

以上代码运行之后向对应监控的目录下原子性写入含有数据的csv文件,在控制台可以看到实时监控内容。文件内容如下:

1,zhangsan,一班,100
2,lisi,二班,200
3,wangwu,一班,300
4,maliu,二班,100
5,tianqi,三班,100
6,gaoba,三班,50
7,zs2,四班,50

3、读取json文件

Scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType/***  Structured Streaming 监控Json格式数据*/
object SSReadJsonData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import  spark.implicits._spark.sparkContext.setLogLevel("Error")//2.创建 json 数据schemaval userSchema: StructType = new StructType().add("id", "integer").add("name", "string").add("age", "integer")val result: DataFrame = spark.readStream.schema(userSchema).json("./data/")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;/*** Structured Streaming实时监控目录中json文件作为数据流*/
public class SSReadJsonData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().appName("File Source test").master("local").getOrCreate();//2.设置日志spark.sparkContext().setLogLevel("Error");//3.设置SchemaStructType userSchema = new StructType().add("id", "integer").add("name", "string").add("age", "integer");//4.指定监控目录读取数据json数据Dataset<Row> ds = spark.readStream().option("sep", ",").schema(userSchema).json("./data/");//5.打印数据到控制台StreamingQuery query =ds.writeStream().format("console").start();query.awaitTermination();}
}

结果:

以上代码启动之后,向监控的目录“./data”下原子写入含有以下内容的json文件,在控制台可以看到实时监控内容。json文件内容如下:

{"id":1,"name":"zs","age":18}
{"id":2,"name":"ls","age":19}
{"id":3,"name":"ww","age":20}
{"id":4,"name":"ml","age":21}

注意:实时监控json格式数据时,创建的Schema 中的字段需要与Json中的属性保持一致,否则在映射成表时,Schema中含有但在Json中没有的属性的字段对应的数据会为null。

二、Socket Source 

读取Socket方式需要指定对应的host和port,读取Socket数据源多用于测试场景,这里不再演示。

可以参考案例:

Spark实时(三):Structured Streaming入门案例-CSDN博客

三、Rate Source

Rate Source是以每秒指定的行数生成数据,每个输出行包含一个timestamp和value,其中timestamp是一个Timestamp含有信息分配的时间类型,value是从0开始的Long类型的数据,Rate Source式多用于测试。

scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, SparkSession}/*** SSRateSource*/
object SSRateSource {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("rate test")
//      .config("spark.sql.shuffle.partitions", 1).getOrCreate()val result: DataFrame = spark.readStream.format("rate")// 配置每秒生成多少行数据,默认1行.option("rowsPerSecond", "10").option("numPartitions", 5).load()result.writeStream.format("console").option("numRows","100").option("truncate","false").start().awaitTermination()}}

结果:

Java代码如下:

package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;public class ssratesource01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("rate test").getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> result = spark.readStream().format("rate")// 配置每秒生成多少行数据,默认1行.option("rowsPerSecond", "10").option("numPartitions", 5).load();result.writeStream().format("console").option("numRows","100").option("truncate","false").start().awaitTermination();}
}

结果: 

 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3266845.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Pytest-基本使用

概念 pytest 是 python 的一种单元测试框架&#xff0c;同自带的 Unittest 测试框架类似&#xff0c;相比于 Unittest 框架使用起来更简洁&#xff0c; 效率更高。 特点 1. 非常容易上手&#xff0c;入门简单&#xff0c;文档丰富&#xff0c;文档中有很多实例可以参考 2. 支…

算法-----递归~~搜索~~回溯(宏观认识)

目录 1.什么是递归 1.1二叉树的遍历 1.2快速排序 1.3归并排序 2.为什么会用到递归 3.如何理解递归 4.如何写好一个递归 5.什么是搜索 5.1深度&#xff08;dfs&#xff09;优先遍历&优先搜索 5.2宽度&#xff08;bfs&#xff09;优先遍历&优先搜索 6.回溯 1.什…

《Redis设计与实现》读书笔记-数据结构(SDS)

目录 SDS定义 SDS结构 SDS与C字符串结构差异 SDS优点 SDS扩容策略 SDS惰性空间回收 SDS定义 SDS&#xff08;简单动态字符串&#xff09;&#xff0c;用于代替C语言自身的字符串&#xff08;字符容量与字符数组强相关&#xff09;。 SDS结构 sdshdr{int free //sds 中…

一下午连续故障两次,谁把我们接口堵死了?!

唉。。。 大家好&#xff0c;我是程序员鱼皮。又来跟着鱼皮学习线上事故的处理经验了喔&#xff01; 事故现场 周一下午&#xff0c;我们的 编程导航网站 连续出现了两次故障&#xff0c;每次持续半小时左右&#xff0c;现象是用户无法正常加载网站&#xff0c;一直转圈圈。 …

Golang | 腾讯一面

go的调度 Golang的调度器采用M:N调度模型&#xff0c;其中M代表用户级别的线程(也就是goroutine)&#xff0c;而N代表的事内核级别的线程。Go调度器的主要任务就是N个OS线程上调度M个goroutine。这种模型允许在少量的OS线程上运行大量的goroutine。 Go调度器使用了三种队列来…

Lua脚本简单理解

目录 1.安装 2.语法 2.1Lua数据类型 2.2变量 2.3lua循环 2.4流程控制 2.5函数 2.6运算符 2.7关系运算符 3.lua脚本在redis中的使用 3.1lua脚本再redis简单编写 3.2普通锁Lua脚本 3.3可重入锁lua脚本 1.安装 centos安装 安装指令&#xff1a; yum -y update yum i…

mysql面试(六)

前言 本章节详细讲解了一下mysql执行计划相关的属性释义&#xff0c;以及不同sql所出现的不同效果 执行计划 一条查询语句经过mysql查询优化器的各种基于成本和各种规则优化之后&#xff0c;会生成一个所谓的 执行计划&#xff0c;这个执行计划展示了这条查询语句具体查询方…

解决zabbix-server7 中文乱码问题

系统使用centos9 安装中文支持 yum install -y fontconfig langpacks-zh_CN.noarch 检查是否已有中文字体&#xff1a; fc-list :langzh 看到 直接使用GOOGLE的字体 ln -fs /usr/share/fonts/google-noto-cjk/NotoSansCJK-DemiLight.ttc /etc/alternatives/zabbix-web-fo…

Godot入门 05收集物品

创建新场景&#xff0c;添加Area2D节点&#xff0c;AnimatedSprite2D节点 &#xff0c;CollisionShape2D节点 添加硬币 按F键居中&#xff0c;放大视图。设置动画速度设为10FPS&#xff0c;加载后自动播放&#xff0c;动画循环 碰撞形状设为圆形&#xff0c;修改Area2D节点为Co…

python+vue3+onlyoffice在线文档系统实战20240725笔记,首页开发

解决遗留问题 内容区域的高度没有生效&#xff0c;会随着菜单的高度自动变化。 解决方案&#xff1a;给侧边加上一个最小高度。 首页设计 另一种设计&#xff1a; 进来以后&#xff0c;是所有的文件夹和最近的文件。 有一张表格&#xff0c;类似于Windows目录详情&…

MySQL窗口函数详解

MySQL窗口函数详解 MySQL从8.0版本开始引入了窗口函数&#xff0c;这是一个强大的特性&#xff0c;可以大大简化复杂的数据分析任务。本文将详细介绍MySQL窗口函数的概念、语法和常见用法&#xff0c;并结合实际应用场景进行说明。 什么是窗口函数? 窗口函数是一种能够对结…

git 版本回退-idea

1、选中项目&#xff0c;右键&#xff0c;打开 git历史提交记录 2、选中想要回退的版本&#xff0c;选择 hard&#xff08;不保留版本记录&#xff09; 3、最终选择强制提交&#xff08;必须强制&#xff09; OK&#xff0c;搞定

AI(Adobe lliustrator)教程+软件包

简介&#xff1a; 软件主要应用于印刷出版、海报书籍排版、专业插画、多媒体图像处理和互联网页面的制作等&#xff0c;也可以为线稿提供较高的精度和控制&#xff0c;适合生产任何小型设计到大型的复杂项目。 通常用于创建LOGO(商标或徽标)&#xff0c;图标&#xff0c;插图…

go语言学习文档精简版

Go语言是一门开源的编程语言&#xff0c;目的在于降低构建简单、可靠、高效软件的门槛。Go平衡了底层系统语言的能力&#xff0c;以及在现代语言中所见到的高级特性。 你好&#xff0c;Go package main // 程序组织成包import "fmt" // fmt包用于格式化输出数据// …

C++ primer plus 第16章string 类和标准模板库, 函数符概念

C primer plus 第16章string 类和标准模板库, 函数符概念 C primer plus 第16章string 类和标准模板库, 函数符概念 文章目录 C primer plus 第16章string 类和标准模板库, 函数符概念16.5.1 函数符概念程序清单16.15 functor.cpp 16.5.1 函数符概念 正如 STL定义了容器和迭代…

20240725项目的maven环境报红-重新配置maven

1.在编辑器里面打开项目&#xff0c;导入源码 &#xff08;1&#xff09;找到项目的地址C:\Users\zzz\IdeaProjects\datasys&#xff0c;然后右击用idea编辑器打开。 &#xff08;2&#xff09;idea中上菜单栏打开open&#xff0c;然后输入file&#xff0c;选择源代码文件 2.…

C++ //练习 15.28 定义一个存放Quote对象的vector,将Bulk_quote对象传入其中。计算vector中所有元素总的net_price。

C Primer&#xff08;第5版&#xff09; 练习 15.28 练习 15.28 定义一个存放Quote对象的vector&#xff0c;将Bulk_quote对象传入其中。计算vector中所有元素总的net_price。 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; 工具&#xff1a;vim 代码块&am…

openFeign配置okhttp

原来的项目出现了性能问题&#xff0c;老大不知道怎么的&#xff0c;让我改openFeign线程池为okhttp&#xff0c;说原生的不支持线程池性能比较差。 原openFeign配置文章地址 一、pom文件 <dependency><groupId>org.springframework.cloud</groupId><arti…

泰金新能估值暴增之谜:研发费用率远低同行,资产负债率居高不下

《港湾商业观察》施子夫 王璐 作为新“国九条”首家受理的科创板IPO企业&#xff0c;外界对于西安泰金新能科技股份有限公司&#xff08;以下简称&#xff0c;泰金新能&#xff09;的关注度自然相当之高。 泰金新能保荐机构为中信建投。通过招股书不难看出&#xff0c;公司的…

idea中导入外部依赖并打包到jar包中

前言&#xff1a; 很多时候在我们写项目对接三方的时候都需要导入三方jar包&#xff0c;而这时候我们用平常的pom里面写依赖发现导入不了&#xff08;直接把jar包放在本地导入的话打包的话也不会将该依赖打包进我们项目的jar包&#xff09;&#xff0c;我在网上找了几种方法 …