Spark(38):Streaming DataFrame 和 Streaming DataSet 转换

目录

0. 相关文章链接

1. 基本操作

1.1. 弱类型 api

1.2. 强类型

1.3. 直接执行 sql

2. 基于 event-time 的窗口操作

2.1. event-time 窗口理解

2.2. event-time 窗口生成规则

3. 基于 Watermark 处理延迟数据

3.1. 什么是 Watermark 机制

3.2. update 模式下使用 watermark

3.3. append 模式下使用 wartermark

3.4. watermark 机制总结

4. 流数据去重

5. join操作

5.1. Stream-static Joins

5.1.1. 内连接

5.1.2. 外连接

5.2. Stream-stream Joins

5.2.1. inner join

4.2.2. outer join

6. Streaming DF/DS 不支持的操作


0. 相关文章链接

 Spark文章汇总 

1. 基本操作

在 DF/DS 上大多数通用操作都支持作用在 Streaming DataFrame/Streaming DataSet 上。

准备处理数据: people.json

{"name": "Michael","age": 29,"sex": "female"}
{"name": "Andy","age": 30,"sex": "male"}
{"name": "Justin","age": 19,"sex": "male"}
{"name": "Lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}

1.1. 弱类型 api

代码示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 创建格式,并读取数据val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 弱类型 apival df: DataFrame = peopleDF.select("name", "age", "sex").where("age > 20")df.writeStream.outputMode("append").format("console").start.awaitTermination()// 关闭执行环境spark.stop()}
}

结果输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|Michael| 29|female|
|   Andy| 30|  male|
|zhiling| 40|female|
+-------+---+------+

1.2. 强类型

代码示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 创建格式,并读取数据val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 强类型,转成 dsval peopleDS: Dataset[People] = peopleDF.as[People]val df: Dataset[String] = peopleDS.filter((_: People).age > 20).map((_: People).name)df.writeStream.outputMode("append").format("console").start.awaitTermination()// 关闭执行环境spark.stop()}
}case class People(name: String, age: Long, sex: String)

结果输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+
|  value|
+-------+
|Michael|
|   Andy|
|zhiling|
+-------+

1.3. 直接执行 sql

代码示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 创建格式,并读取数据val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 直接执行SQL,创建临时表peopleDF.createOrReplaceTempView("people")val df: DataFrame = spark.sql("select * from people where age > 20")df.writeStream.outputMode("append").format("console").start.awaitTermination()// 关闭执行环境spark.stop()}
}

结果输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|Michael| 29|female|
|   Andy| 30|  male|
|zhiling| 40|female|
+-------+---+------+

2. 基于 event-time 的窗口操作

2.1. event-time 窗口理解

        在 Structured Streaming 中, 可以按照事件发生时的时间对数据进行聚合操作, 即基于 event-time 进行操作。在这种机制下, 即不必考虑 Spark 陆续接收事件的顺序是否与事件发生的顺序一致, 也不必考虑事件到达 Spark 的时间与事件发生时间的关系。因此, 它在提高数据处理精度的同时, 大大减少了开发者的工作量。我们现在想计算 10 分钟内的单词, 每 5 分钟更新一次, 也就是说在 10 分钟窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之间收到的单词量。 注意, 12:00 - 12:10 表示数据在 12:00 之后 12:10 之前到达。现在,考虑一下在 12:07 收到的单词。单词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)索引。

统计后的结果应该是这样的:

代码示例:

import org.apache.spark.sql.functions.window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket数据源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).option("includeTimestamp", value = true) // 给产生的数据自动添加时间戳.load// 把行切割成单词, 保留时间戳val words: DataFrame = lines.as[(String, Timestamp)].flatMap((line: (String, Timestamp)) => {line._1.split(" ").map(((_: String), line._2))}).toDF("word", "timestamp")// 按照窗口和单词分组, 并且计算每组的单词的个数,最后按照窗口排序val wordCounts: Dataset[Row] = words.groupBy(// 调用 window 函数, 返回的是一个 Column 类型// 参数 1: df 中表示时间戳的列// 参数 2: 窗口长度// 参数 3: 滑动步长window($"timestamp", "60 seconds", "10 seconds"),$"word").count().orderBy($"window")wordCounts.writeStream.outputMode("complete").format("console").option("truncate", "false") // 不截断.为了在控制台能看到完整信息, 最好设置为 false.start.awaitTermination()// 关闭执行环境spark.stop()}
}

结果输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a   |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a   |3    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2    |
+------------------------------------------+----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a   |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a   |3    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2    |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|a   |2    |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|abc |1    |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|a   |2    |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|abc |1    |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|a   |2    |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|abc |1    |
|[2023-08-02 21:19:40, 2023-08-02 21:20:40]|a   |2    |
+------------------------------------------+----+-----+
only showing top 20 rows

由此可以看出, 在这种窗口机制下, 无论事件何时到达, 以怎样的顺序到达, Structured Streaming 总会根据事件时间生成对应的若干个时间窗口, 然后按照指定的规则聚合。

2.2. event-time 窗口生成规则

可以查看 org.apache.spark.sql.catalyst.analysis.TimeWindowing 类下的如下代码:

The windows are calculated as below:
maxNumOverlapping <- ceil(windowDuration / slideDuration)
for (i <- 0 until maxNumOverlapping)windowId <- ceil((timestamp - startTime) / slideDuration)windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTimewindowEnd <- windowStart + windowDurationreturn windowStart, windowEnd

        将event-time 作为“初始窗口”的结束时间, 然后按照窗口滑动宽度逐渐向时间轴前方推进, 直到某个窗口不再包含该 event-time 为止。 最终以“初始窗口”与“结束窗口”之间的若干个窗口作为最终生成的 event-time 的时间窗口。

每个窗口的起始时间与结束时间都是前必后开的区间, 因此初始窗口和结束窗口都不会包含 event-time, 最终不会被使用。

得到窗口如下:

3. 基于 Watermark 处理延迟数据

3.1. 什么是 Watermark 机制

        在数据分析系统中, Structured Streaming 可以持续的按照 event-time 聚合数据, 然而在此过程中并不能保证数据按照时间的先后依次到达。 例如: 当前接收的某一条数据的 event-time 可能远远早于之前已经处理过的 event-time。 在发生这种情况时, 往往需要结合业务需求对延迟数据进行过滤。现在考虑如果事件延迟到达会有哪些影响。 假如, 一个单词在 12:04(event-time) 产生, 在 12:11 到达应用。 应用应该使用 12:04 来在窗口(12:00 - 12:10)中更新计数, 而不是使用 12:11。 这些情况在我们基于窗口的聚合中是自然发生的, 因为结构化流可以长时间维持部分聚合的中间状态。

        但是, 如果这个查询运行数天, 系统很有必要限制内存中累积的中间状态的数量。 这意味着系统需要知道何时从内存状态中删除旧聚合, 因为应用不再接受该聚合的后期数据。为了实现这个需求, 从 spark2.1, 引入了 watermark(水印), 使用引擎可以自动的跟踪当前的事件时间, 并据此尝试删除旧状态。通过指定 event-time 列和预估事件的延迟时间上限来定义一个查询的 watermark。 针对一个以时间 T 结束的窗口, 引擎会保留状态和允许延迟时间直到(max event time seen by the engine - late threshold > T)。 换句话说, 延迟时间在上限内的被聚合, 延迟时间超出上限的开始被丢弃。

        可以通过withWatermark() 来定义watermark,watermark 计算方式:watermark = MaxEventTime - Threshhod;而且, watermark只能逐渐增加, 不能减少。

Structured Streaming 引入 Watermark 机制, 主要是为了解决以下两个问题:

  • 处理聚合中的延迟数据
  • 减少内存中维护的聚合状态.

注意:在不同输出模式(complete, append, update)中, Watermark 会产生不同的影响。

3.2. update 模式下使用 watermark

在 update 模式下, 仅输出与之前批次的结果相比, 涉及更新或新增的数据。

代码示例如下:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket数据源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 输入的数据中包含时间戳, 而不是自动添加的时间戳val words: DataFrame = lines.as[String].flatMap((line: String) => {val split: Array[String] = line.split(",")split(1).split(" ").map(((_: String), Timestamp.valueOf(split(0))))}).toDF("word", "timestamp")// 使用 withWatermark 方法,添加watermark, 参数 1: event-time 所在列的列名 参数 2: 延迟时间的上限.val wordCounts: Dataset[Row] = words.withWatermark("timestamp", "2 minutes").groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word").count()// 数据输出val query: StreamingQuery = wordCounts.writeStream.outputMode("update").trigger(Trigger.ProcessingTime(1000)).format("console").option("truncate", "false").startquery.awaitTermination()// 关闭执行环境spark.stop()}
}

初始化的wartmark是 0,通过如下输入的几条数据,可以看到水位线的变化。

第一次输入数据:  2023-08-07 10:55:00,dog 。这个条数据作为第一批数据。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 个窗口。 由于是第一批, 所有的窗口的结束时间都大于 wartermark(0), 所以 5 个窗口都显示,如下所示:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
+------------------------------------------+----+-----+

然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark. 本批次只有一个数据(10:55), 所有: watermark = 10:55 - 2min = 10:53 。

第二次输入数据:  2023-08-07 11:00:00,dog 。 这条数据作为第二批数据, 计算得到 5 个窗口。 此时的watermark=10:53, 所有的窗口的结束时间均大于 watermark。 在 update 模式下, 只输出结果表中涉及更新或新增的数据。

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:58:00, 2023-08-07 11:08:00]|dog |1    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |2    |
|[2023-08-07 10:56:00, 2023-08-07 11:06:00]|dog |1    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |2    |
|[2023-08-07 11:00:00, 2023-08-07 11:10:00]|dog |1    |
+------------------------------------------+----+-----+

其中: count 是 2 的表示更新, count 是 1 的表示新增。 没有变化的就没有显示(但是内存中仍然保存着)。此时的的 watermark = 11:00 - 2min = 10:58 。如下数据为在内存中保存着,但是没有打印出来的数据:

|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1    |

第三次输入数据:   2023-08-07 10:55:00,dog  。 这条数据作为第 3 批次,相当于一条延迟数据,计算得到 5 个窗口。此时的 watermark = 10:58 当前内存中有两个窗口的结束时间已经低于 10: 58。

|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |

则立即删除这两个窗口在内存中的维护状态。 同时, 当前批次中新加入的数据所划分出来的窗口, 如果窗口结束时间低于 11:58, 则窗口会被过滤掉。

所以这次输出结果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |2    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |3    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |3    |
+------------------------------------------+----+-----+

第三个批次的数据处理完成后, 立即计算: watermark= 10:55 - 2min = 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变(因为 watermask 只能增加不能减少)。

3.3. append 模式下使用 wartermark

代码示例如下:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket数据源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 输入的数据中包含时间戳, 而不是自动添加的时间戳val words: DataFrame = lines.as[String].flatMap((line: String) => {val split: Array[String] = line.split(",")split(1).split(" ").map(((_: String), Timestamp.valueOf(split(0))))}).toDF("word", "timestamp")// 使用 withWatermark 方法,添加watermark, 参数 1: event-time 所在列的列名 参数 2: 延迟时间的上限.val wordCounts: Dataset[Row] = words.withWatermark("timestamp", "2 minutes").groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word").count()// 数据输出val query: StreamingQuery = wordCounts.writeStream.outputMode("append").trigger(Trigger.ProcessingTime(0)).format("console").option("truncate", "false").startquery.awaitTermination()// 关闭执行环境spark.stop()}
}

在 append 模式中, 仅输出新增的数据, 且输出后的数据无法变更。

第一次输入数据: 2023-08-07 10:55:00,dog  。 这个条数据作为第一批数据。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 个窗口。 由于此时初始 watermask=0, 当前批次中所有窗口的结束时间均大于 watermask。但是 Structured Streaming 无法确定后续批次的数据中是否会更新当前批次的内容。 因此, 基于 Append 模式的特点, 这时并不会输出任何数据(因为输出后数据就无法更改了), 直到某个窗口的结束时间小于 watermask, 即可以确定后续数据不会再变更该窗口的聚合结果时才会将其输出, 并移除内存中对应窗口的聚合状态。

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark。 本批次只有一个数据(10:55), 所有: watermark = 10:55 - 2min = 10:53

第二次输入数据: 2023-08-07 11:00:00,dog  。这条数据作为第二批数据, 计算得到 5 个窗口。 此时的watermark=10:53, 所有的窗口的结束时间均大于 watermark, 仍然不会输出。

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后计算 watermark = 11:00 - 2min = 10:58

第三次输入数据: 2023-08-07 10:55:00,dog 。相当于一条延迟数据,这条数据作为第 3 批次, 计算得到 5 个窗口。 此时的 watermark = 10:58 当前内存中有两个窗口的结束时间已经低于 10: 58。

|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |

则意味着这两个窗口的数据不会再发生变化, 此时输出这个两个窗口的聚合结果, 并在内存中清除这两个窗口的状态。所以这次输出结果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
+------------------------------------------+----+-----+

第三个批次的数据处理完成后, 立即计算: watermark= 10:55 - 2min = 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变。(因为 watermask 只能增加不能减少)

3.4. watermark 机制总结

  • watermark 在用于基于时间的状态聚合操作时, 该时间可以基于窗口, 也可以基于 event-timeb本身。
  • 输出模式必须是append或update。 在输出模式是complete的时候(必须有聚合), 要求每次输出所有的聚合结果。 我们使用 watermark 的目的是丢弃一些过时聚合数据, 所以complete模式使用wartermark无效也无意义。
  • 在输出模式是append时, 必须设置 watermask 才能使用聚合操作。 其实, watermask 定义了 append 模式中何时输出聚合聚合结果(状态), 并清理过期状态。
  • 在输出模式是update时, watermask 主要用于过滤过期数据并及时清理过期状态。
  • watermask 会在处理当前批次数据时更新, 并且会在处理下一个批次数据时生效使用。 但如果节点发送故障, 则可能延迟若干批次生效。
  • withWatermark 必须使用与聚合操作中的时间戳列是同一列。df.withWatermark("time", "1 min").groupBy("time2").count() 无效。
  • withWatermark 必须在聚合之前调用。 f.groupBy("time").count().withWatermark("time", "1 min") 无效。

4. 流数据去重

需求内容:根据唯一的 id 实现数据去重

代码示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket数据源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 数据预处理val words: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), Timestamp.valueOf(arr(1)), arr(2))}).toDF("uid", "ts", "word")// 去重重复数据 uid 相同就是重复.  可以传递多个列val wordCounts: Dataset[Row] = words.withWatermark("ts", "2 minutes").dropDuplicates("uid")// 输出数据wordCounts.writeStream.outputMode("append").format("console").start.awaitTermination()// 关闭执行环境spark.stop()}
}

数据输入(按顺序从上到下):

1,2023-08-09 11:50:00,dog
2,2023-08-09 11:51:00,dog
1,2023-08-09 11:50:00,dog
3,2023-08-09 11:53:00,dog
1,2023-08-09 11:50:00,dog
4,2023-08-09 11:45:00,dog

注意点:

  • dropDuplicates 不可用在聚合之后, 即通过聚合得到的 df/ds 不能调用dropDuplicates 
  • 使用watermask - 如果重复记录的到达时间有上限,则可以在事件时间列上定义水印,并使用guid和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些记录不会再被重复。这限制了查询必须维护的状态量。 
  • 没有watermask - 由于重复记录可能到达时没有界限,查询将来自所有过去记录的数据存储为状态。

测试:

  • 第一次输入数据:1,2023-08-09 11:50:00,dog
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  1|2023-08-09 11:50:00| dog|
+---+-------------------+----+
  • 第二次输入数据:2,2023-08-09 11:51:00,dog
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  2|2023-08-09 11:51:00| dog|
+---+-------------------+----+
  • 第三次输入数据:1,2023-08-09 11:50:00,dog (id 重复无输出)
  • 第四次输入数据:3,2023-08-09 11:53:00,dog (此时 watermask=11:51)
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  3|2023-08-09 11:53:00| dog|
+---+-------------------+----+
  • 第五次输入数据:1,2023-08-09 11:50:00,dog (数据重复, 并且数据过期, 所以无输出)
  • 第六次输入数据:4,2023-08-09 11:45:00,dog (数据过时, 所以无输出)

5. join操作

        Structured Streaming 支持 streaming DataSet/DataFrame 与静态的DataSet/DataFrame 进行 join, 也支持 streaming DataSet/DataFrame与另外一个streaming DataSet/DataFrame 进行 join。join 的结果也是持续不断的生成, 类似于前面的 streaming 的聚合结果。

5.1. Stream-static Joins

静态数据:

lisi,male
zhiling,female
zs,male

流式数据:

lisi,20
zhiling,40
ww,30

5.1.1. 内连接

代码示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 1. 静态 dfval arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));val staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")// 2. 流式 dfval lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val streamDF: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt)}).toDF("name", "age")// 3. join   等值内连接  a.name=b.nameval joinResult: DataFrame = streamDF.join(staticDF, "name")// 4. 输出joinResult.writeStream.outputMode("append").format("console").start.awaitTermination()// 关闭执行环境spark.stop()}
}

数据输出:

+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|   lisi| 20|  male|
+-------+---+------+

5.1.2. 外连接

代码示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 1. 静态 dfval arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));val staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")// 2. 流式 dfval lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val streamDF: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt)}).toDF("name", "age")// 3. joinval joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left")// 4. 输出joinResult.writeStream.outputMode("append").format("console").start.awaitTermination()// 关闭执行环境spark.stop()}
}

数据输出:

+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|     ww| 30|  null|
|   lisi| 20|  male|
+-------+---+------+

5.2. Stream-stream Joins

        在 Spark2.3, 开始支持 stream-stream join。Spark 会自动维护两个流的状态, 以保障后续流入的数据能够和之前流入的数据发生 join 操作, 但这会导致状态无限增长。 因此, 在对两个流进行 join 操作时, 依然可以用 watermark 机制来消除过期的状态, 避免状态无限增长。

第 1 个数据格式:姓名,年龄,事件时间

lisi,female,2023-08-09 11:50:00
zs,male,2023-08-09 11:51:00
ww,female,2023-08-09 11:52:00
zhiling,female,2023-08-09 11:53:00
fengjie,female,2023-08-09 11:54:00
yifei,female,2023-08-09 11:55:00

第 2 个数据格式:姓名,年龄,事件时间

lisi,18,2023-08-09 11:50:00
zs,19,2023-08-09 11:51:00
ww,20,2023-08-09 11:52:00
zhiling,22,2023-08-09 11:53:00
yifei,30,2023-08-09 11:54:00
fengjie,98,2023-08-09 11:55:00

5.2.1. inner join

对 2 个流式数据进行 join 操作,输出模式仅支持append模式。

不带 watermast 的 inner join(join 的速度很慢):

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 个 streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name", "sex", "ts1")// 第 2 个 streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name", "age", "ts2")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")// 数据输出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 关闭执行环境spark.stop()}
}//      数据输出:
//      +-------+------+-------------------+---+-------------------+
//      |   name|   sex|                ts1|age|                ts2|
//      +-------+------+-------------------+---+-------------------+
//      |zhiling|female|2023-08-09 11:53:00| 22|2023-08-09 11:53:00|
//      |     ww|female|2023-08-09 11:52:00| 20|2023-08-09 11:52:00|
//      |  yifei|female|2023-08-09 11:55:00| 30|2023-08-09 11:54:00|
//      |     zs|  male|2023-08-09 11:51:00| 19|2023-08-09 11:51:00|
//      |fengjie|female|2023-08-09 11:54:00| 98|2023-08-09 11:55:00|
//      |   lisi|female|2023-08-09 11:50:00| 18|2023-08-09 11:50:00|
//      +-------+------+-------------------+---+-------------------+

带 watermast 的 inner join(join 的速度很慢):

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 个 streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name1", "sex", "ts1")// 第 2 个 streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name2", "age", "ts2").withWatermark("ts2", "1 minutes")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream,expr("""|name1=name2 and|ts2 >= ts1 and|ts2 <= ts1 + interval 1 minutes""".stripMargin))// 数据输出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 关闭执行环境spark.stop()}
}//      数据输出:
//      +-------+------+-------------------+-------+---+-------------------+
//      |  name1|   sex|                ts1|  name2|age|                ts2|
//      +-------+------+-------------------+-------+---+-------------------+
//      |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
//      |     ww|female|2023-08-09 11:52:00|     ww| 20|2023-08-09 11:52:00|
//      |     zs|  male|2023-08-09 11:51:00|     zs| 19|2023-08-09 11:51:00|
//      |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
//      |   lisi|female|2023-08-09 11:50:00|   lisi| 18|2023-08-09 11:50:00|
//      +-------+------+-------------------+-------+---+-------------------+

4.2.2. outer join

外连接必须使用 watermast,和内连接相比, 代码几乎一致, 只需要在连接的时候指定下连接类型即可:joinType = "left"。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 个 streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name1", "sex", "ts1")// 第 2 个 streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name2", "age", "ts2").withWatermark("ts2", "1 minutes")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream,expr("""|name1=name2 and|ts2 >= ts1 and|ts2 <= ts1 + interval 1 minutes""".stripMargin),joinType = "left")// 数据输出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 关闭执行环境spark.stop()}
}//      数据输出:
//        +-------+------+-------------------+-------+---+-------------------+
//        |  name1|   sex|                ts1|  name2|age|                ts2|
//        +-------+------+-------------------+-------+---+-------------------+
//        |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
//        |     ww|female|2023-08-09 11:52:00|     ww| 20|2023-08-09 11:52:00|
//        |     zs|  male|2023-08-09 11:51:00|     zs| 19|2023-08-09 11:51:00|
//        |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
//        |   lisi|female|2023-08-09 11:50:00|   lisi| 18|2023-08-09 11:50:00|
//        +-------+------+-------------------+-------+---+-------------------+

6. Streaming DF/DS 不支持的操作

到目前, DF/DS 的有些操作 Streaming DF/DS 还不支持:

  • 多个Streaming 聚合(例如在 DF 上的聚合链)目前还不支持
  • limit 和取前 N 行还不支持
  • distinct 也不支持
  • 仅仅支持对 complete 模式下的聚合操作进行排序操作
  • 仅支持有限的外连接
  • 有些方法不能直接用于查询和返回结果, 因为他们用在流式数据上没有意义
    • count() 不能返回单行数据, 必须是s.groupBy().count()
    • foreach() 不能直接使用, 而是使用: ds.writeStream.foreach(...)
    • show() 不能直接使用, 而是使用 console sink

如果执行上面操作会看到这样的异常: operation XYZ is not supported with streaming DataFrames/Datasets。


注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/1380722.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Docker安装 Kibana

目录 前言安装Kibana步骤1&#xff1a;准备1. 安装docker2. 搜索可以使用的镜像。3. 也可从docker hub上搜索镜像。4. 选择合适的redis镜像。 步骤2&#xff1a;拉取 kibana 镜像拉取镜像查看已拉取的镜像 步骤3&#xff1a;创建容器创建容器方式1&#xff1a;快速创建容器 步骤…

怎么学习AJAX相关技术? - 易智编译EaseEditing

学习AJAX&#xff08;Asynchronous JavaScript and XML&#xff09;相关技术可以让你实现网页的异步数据交互&#xff0c;提升用户体验。以下是一些学习AJAX技术的步骤和资源&#xff1a; HTML、CSS和JavaScript基础&#xff1a; 首先&#xff0c;确保你已经掌握了基本的HTML…

网络营销新人应该掌握的6大通关方法

当然大多数网络营销新人的基础目的还是想实现网赚的第一步&#xff0c;那么我们以网赚为基础的前提下&#xff0c;新人应该掌握以下6大通关方法&#xff1a; 一、微博营销 以新浪微博为例&#xff0c;营销大号已然成为微博江湖里盈利模式较清晰的一种模式。这是目前营销模式…

破解密码--csa

#passwd [USERNAME] 修改或者设置用户的密码 #passwd -d [USERNAME] 删除用户密码 注&#xff1a; 管理执行passwd&#xff1a; 可以指定用户名&#xff1b;不要输入之前的密码&#xff1b;可以更改和删除所有用户的密码&#xff1b;密码等级没有严格要求 普通用户执行p…

程序员的代码行数越少越好?

点击上方“码农突围”&#xff0c;马上关注&#xff0c;每天上午8:50准时推送 这里是码农充电第一站&#xff0c;回复“666”&#xff0c;获取一份专属大礼包 真爱&#xff0c;请设置“星标”或点个“在看” 作者 | Ryland Goldstein 出自 | CSDN 代码行数越少越好&#xff1f;…

PE启动盘和U启动盘(第三十六课)

PE启动盘和U启动盘(第三十六课) 一 WindowsPE工具盘 1. 制作WinPE镜像光盘 双击WePE64_V2.2-是-点击右下角光盘图标-选择ISO的输出位置-立即生成ISO 2. 通过光盘启动WinPE

细胞分裂2java版第4关怎么过_奇迹暖暖第二卷第二章第4关怎么过_ 奇迹暖暖II-2-4通关攻略_游戏吧...

奇迹暖暖游戏中有很多玩家都想知道第二卷第二章第4关怎么过&#xff0c;下面游戏吧小编为大家奇迹暖暖II-2-4通关攻略&#xff0c;感谢大家的阅读&#xff0c;更多精彩内容请关注游戏吧&#xff01; 属性&#xff1a;华丽、可爱、活泼、清纯、清凉&#xff1b; 技能推荐&#x…

面试问题大通关

面试问题大通关大灰狼 问题一&#xff1a;“请你自我介绍一下” 常规思路&#xff1a; ①介绍内容要与个人简历相一致&#xff1b; ②表述方式上尽量口语化&#xff1b;③要切中要害&#xff0c;不谈无关、无用的内容&#xff1b;④条理要清晰&#xff0c;层次要分明&a…

FC金手指代码大全·持续更新-亲测可用-FC 经典游戏完整可用的金手指大全---持续更新,偶尔玩玩经典回味无穷,小时候不能通关的现在通通通关一遍

FC 经典游戏完整可用的金手指大全—持续更新&#xff0c;偶尔玩玩经典回味无穷&#xff0c;小时候不能通关的现在通通通关一遍 2021年5月11日更新&#xff1a; 每次翻金手指一些垃圾小网站标题党吸引进去吓一大堆木马什么也没有&#xff0c;什么xx无敌版&#xff0c;浪费了宝…

codecombat极客战记森林61-70通关代码

codecombat极客战记森林61-70通关代码 python代码 61.罪与罚 # 看起来食人魔酋长正在偷你的宝石啊&#xff01; # 指挥两门火炮攻击敌人达成目标。while True:item hero.findNearestItem()if item:itempos item.pos.x " " item.pos.yhero.say("item at :&…

在线sqli-labs 通关大全 Less-2

sqli-labs (2) 励志语录 青年是学习智慧的时期&#xff0c;中年是付诸实践的时期。——卢梭 1.知识点 sql注入常见参数 user()&#xff1a;当前数据库用户 database()&#xff1a;当前数据库名 concat()&#xff1a;联合数据&#xff0c;用于联合两条数据结果。如 concat(us…

在线sqli-labs 通关大全 Less-1

sqli-labs (1) sqli-labs实验前&#xff1a; 本人因在sqli-labs搭建过程中遇到大量问题&#xff0c;百度后依旧无果&#xff0c;所以现决定在在线sqli-labs平台进行试验。 sqli-labs在线平台: http://43.247.91.228:84/ 在线平台sqli-labs Page-1 点击Less-1可直接进行试验 …

龙狼三国神龙守护者自动通关脚本

2019独角兽企业重金招聘Python工程师标准>>> 本段代码是一个按键精灵脚本 龙狼三国的神龙守护者既刷新BUG被修复后&#xff0c;官方又想出了新的策略&#xff1a;每次需要用鼠标左键单击“开始战斗”进入神龙守护者&#xff0c;而连续两次鼠标不能在同一个位置点击这…

Java面试通关要点

Java面试通关要点 2018-03-23 梁桂钊 占小狼的博客 占小狼的博客 占小狼的博客 微信号 whywhy_zj 功能介绍 Java进阶技术干货、实践分享&#xff0c;跟着狼哥一起学习JVM、性能调优&#xff0c;欢迎关注。 首先&#xff0c;声明下&#xff0c;以下知识点并非阿里的面试题。 …

商家如何低成本搭建声音人无人直播开展带货直播?

随着直播行业日渐火热&#xff0c;人工智能也开始逐渐渗透到这个领域。在前不久刚出现的AI数字人直播间已经成为一种创新的直播模式。而在这个领域里&#xff0c;声音人无人直播也逐渐展现出了自己的优势。那么&#xff0c;在AI数字人直播之后出现的声音人无人直播又有哪些优势…

这五部关于宇宙的神级纪录片,带你探索未知的外太空世界

宇宙之大无奇不有&#xff0c;在你的认知里你又知道多少关于宇宙的事情&#xff0c;如果单单用外星人概括你所对宇宙的认知就真的太片面了&#xff0c;小编今天就带来下面这四部关于宇宙的硬核纪录片&#xff0c;带你真正的去了解关于宇宙的知识&#xff0c;让你遨游在宇宙的知…

有没有人救救

c语言掷骰子代码 不会求求

放置英雄-深空探索-传输空间 地图

这个传输空间有点复杂&#xff0c;整理了一个地图出来&#xff0c;需要的朋友可以参考一下

Hello工作室制作《无人深空》更新档

《无人深空》&#xff08;No Man’s Sky&#xff09;可能很快又要更新了&#xff0c;这款备受争议的太空探索游戏曾在去年12月进行过一次大型更新&#xff0c;不知道本次制作的更新规模如何&#xff0c;是单纯的bug修复还是有额外内容扩充。 无人深空 《无人深空》&#xff08;…

盘点美军的无人机家底

相对于国内无人机产业,国外军用无人机起步较早,积累了大量发展经验和先进理念&#xff0c;尤其是美国&#xff0c;在发动的近几场战争中都广泛应用无人机进行监视和侦察&#xff0c;还参与空中打击行动。军用无人机的使用大大能降低军事行动中的经济成本和自身伤亡情况。既然如…