最近看了下structured streaming 的基本用法,大部分虽然是翻译官方文档,但是从翻译中也可以加深理解。
基本介绍和编程模型
Spark2.2.0 在7月12号发布,这个版本的Structured Streaming 抛掉了试验的标签,可以正式在生产环境使用。
Structured Streaming 是基于Spark SQL 引擎的流式计算引擎,将流式计算应用于DataFrame.随着数据不断地到达,Spark SQL引擎会以一种增量的方式来执行这些操作,并且持续更新计算结果。其基本概念就是将输入数据流作为
“Input Table”,每次新收到的数据会成为该表新的一行。
每次针对数据的查询都会生成一个“Result Table”。每一次的
触发间隔(比如说1s),Input Table 新增的一行,最终都会在Result Table 进行更新。当result table 更新的
时候,我们可能会将改变的数据写入外部存储。
Input Source
File source - 以文件流的形式读取目录中写入的文件。 支持的文件格式为text,csv,json,parquet。 有关更多最新列表,可以看下DataStreamReader界面的文档,并支持各种文件格式的选项。 请注意,文件必须是被移动到目录中的,比如用mv命令。
kafka source - 从kafka poll 数据,兼容 kafka broker 0.10.0 或更高版本。更多详情看
Kafka Integration Guide
Socket source (for testing )从socket 连接中读取 UTF8 数据,仅用于测试,不提供容错保证。
某些数据源是不支持容错的,因为它们不能保证在故障之后可以通过checkedpoint offsets 来重新消费数据。
Source | Options | Fault-tolerant | Notes |
---|---|---|---|
File | path:输入路径,适用所有格式 maxFilesPerTrigger:每次触发时,最大新文件数(默认:无最大) latestFirst:是否首先处理最新的文件,当有大量积压的文件时,有用(默认值:false) fileNameOnly:是否仅根据文件名而不是完整路径检查新文件(默认值:false)。将此设置为“true”,以下文件将被视为相同的文件,因为它们的文件名“dataset.txt”是相同的:· "file:///dataset.txt" "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" | YES | 支持glob路径,但不支持多个逗号分隔的 paths/globs. |
socket | host port | NO | ------ |
kafka | 参见Kafka Integration Guide | YES | ------ |
三种不同的输出模式
Complete Mode - 更新后的整个Result Table将被写入外部存储。 由外部存储决定如何处理整个表的写入。
Append Mode - 在Result Table中,只有自上次触发后新增到result table中的数据将被写入外部存储。 这仅适用于不期望更改结果表中现有行的查询,也就是说,我们确定,result table中已有的数据是肯定不会被改变的,才使用这种模式。
Update Mode - 只有自上次触发以后在Result Table中更新的数据(包括新增的和修改的)将被写入外部存储(可用于Spark 2.1.1)。 这与完全模式不同,因为此模式仅输出自上次触发以来更改的行。 如果查询不包含聚合,它将等同于Append Mode。
简单例子
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("WordCount")
.master("local")
.getOrCreate()
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCount= words .groupBy("value").count()
//执行此代码后,流式计算将在后台启动。
//qurey对象是该活动流查询的句柄
//使用awaitTermination()等待查询的终止.
val qurey = wordCount.writeStream
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(2))
.format("console")
.start()
qurey.awaitTermination()
}
}
lines 为DataFrame是input table,这个表包含了一个名为"value"的列,现在还没有开始收到任何数据,因为我们只是做了transformation操作。 接下来,我们使用.as [String]将DataFrame转换为String数据集,通过flatMap操作将每一行分割成多个单词。 最后,我们通过分组操作生成wordCounts DataFrame。通过start()方法开启流计算。
流数据生成的DataFrame经查询生成wordCounts与静态DataFrame完全相同。 但是,当该查询启动时,Spark将连续检查套接字连接中的新数据。 如果有新数据,Spark将运行一个“增量”查询,将以前的运行计数与新数据相结合,以计算更新的计数,如下所示。