Structured Streaming
structured streaming是一种可伸缩的、容错的、基于Spark SQL引擎的流式计算引擎。你可以使用,与针对静态数据的批处理计算操作一样的方式来编写流式计算操作。随着数据不断地到达,Spark SQL引擎会以一种增量的方式来执行这些操作,并且持续更新结算结果。可以使用java、scala等编程语言,以及dataset/dataframe api来编写计算操作,执行数据流的聚合、基于event的滑动窗口、流式数据与离线数据的join等操作。所有这些操作都与Spark SQL使用一套引擎来执行。此外,structured streaming会通过checkpoint和预写日志等机制来实现一次且仅一次的语义。简单来说,对于开发人员来说,根本不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可,structured streaming在底层会自动去实现快速、可伸缩、容错、一次且仅一次语义。
spark 2.0仅仅是提供beta版本的structured streaming,所有的相关api都是实验性质的。
WordCount入门案例
object StructuredNetworkWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
.format("socket")
.option("host", "spark-project-1")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}