第二十章 流处理基础
什么是流处理
流处理是连续处理新到来的数据以更新计算结果的行为。在流处理中,输入数据是无边界的,没有预定的开始或结束。
流处理的挑战
- 基于应用程序时间戳处理无序数据
- 维持大量的状态
- 支持高吞吐
- 即使有机器故障也仅需对事件进行一次处理
- 处理负载不平衡和拖延者
- 快速响应事件
- 与其他存储中的数据进行连接
- 确定新事件到达时如何更新输出
流处理设计要点
记录级别API和申明式API
- 流处理API最简单的实现方法就是将每个事件传递给应用程序,并使用自定义代码进行响应。提供这种记录级别的API的流处理系统只是给用户提供一个获取每条流数据记录的接口,这样许多复杂状态需要由应用程序负责。
- 因此后续的流处理系统提供了声明式API,应用程序为了响应每个新事件指定要计算的内容,而不是如何计算,也不需要考虑如何从失败中恢复。DStream API可以自动追踪每个操作处理的数据量,可靠地保存相关状态,并在需要的时候从失败中恢复计算。
连续处理与微批处理
-
连续处理模式中,每个节点都不断侦听来自其他节点的消息并将新的更新输出到其子节点。map-reduce中,map的每个节点将从输入源一个一个地读取记录,根据计算逻辑将它们发送到相应的reducer,reducer获取新记录时,将更新状态
-
微批处理系统等待积累少量输入数据,然后使用分布式任务集合并行处理每个批次
Spark的流处理API
DStream API
- 它完全基于Java/Python对象函数,而不是DataFrame Dataset中的结构化表概念
- 完全基于处理时间
- 仅支持微批处理
结构化流处理
- Spark 2.2仅支持微批处理
- 提供结构化处理流式数据的可能
第二十一章 结构化流处理基础
结构化流处理概述
结构化流处理背后的主要思想是将数据流视为连续追加数据的数据表
结构化流即是以流处理方式处理的DataFrame
核心概念
转换和动作
同样适用这两个概念,只是略微有一些限制
输入源
- Kafka
- HDFS
- 用于测试的socket源
接收器
- 需要指定数据源来读取数据流,接收器(sink)和执行引擎还负责可靠地跟踪号数据处理的进度
- 支持的接收器:kafka、文件、用于测试的控制台接收器、用于调试的内存接收器、用于在输出记录上运行任意计算的foreach接收器
输出模式
- append (向输出接收器中添加新记录)
- update (更新有变化的记录)
- complete (重写所有的输出)
触发器
- 定义了数据何时被输出
事件时间处理
- watermarks 允许你制定在事件时间内查看数据的
- 支持事件时间的系统通常允许设置watermarks来限制它们记住旧数据的时长
结构化流处理实例
val static = spark.read.json("/data/activity-data/")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema)
.option("maxFilesPerTrigger", 1).json("/data/activity-data")
val activityCounts = streaming.groupBy("gt").count()
spark.conf.set("spark.sql.shuffle.partitions", 5)
val activityQuery = activityCounts.writeStream.queryName("activity_counts")
.format("memory").outputMode("complete")
.start()
// 启动流式计算
activityQuery.awaitTermination()
// 查询流数据
for( i <- 1 to 5 ) {
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}
数据输入和输出
// 读取kafka
val ds3 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
// 写入kafka
ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream.format("kafka")
.option("checkpointLocation", "/to/HDFS-compatible/dir")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
// foreach接收器
datasetOfString.write.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open a database connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
// 触发器
import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream.trigger(Trigger.ProcessingTime("100 seconds"))
.format("console").outputMode("complete").start()
第二十二章 事件时间和有状态处理
有状态处理
- 当你执行有状态操作时,Spark会为你处理所有复杂的事情。例如,在实现分组操作时,结构化流处理会为你维护并更新信息,你只需指定处理逻辑。在执行有状态操作时,Spark会将中间结果信息存储在状态存储中。Spark当前的状态存储实现是一个内存状态存储,它通过将中间状态存储到检查点目录来实现容错。
滚动窗口
- 窗口不会发生重叠,指定窗口的间隔
-
时间窗口实际上是一个结构体
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
滑动窗口
-
窗口可以重叠
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
.count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
使用水位处理延迟数据
- 指定水位可以确定过期数据
- 指定水位的方式
import org.apache.spark.sql.functions.{window, col}
withEventTime
.withWatermark("event_time", "5 hours")
.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
.count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
在流中删除重复项
dropDuplicates
import org.apache.spark.sql.functions.expr
withEventTime
.withWatermark("event_time", "5 seconds")
.dropDuplicates("User", "event_time")
.groupBy("User")
.count()
.writeStream
.queryName("deduplicated")
.format("memory")
.outputMode("complete")
.start()
任意有状态处理
- 可以根据给定键的计数创建窗口
- 如果特定时间范围发生多个特定事件则报警
- 如果不确定时间内维护用户会话,保存这些会话一遍稍后进行分析
执行这类处理时需要做以下两件事
- 映射数据中的组,对每组数据进行操作,并为每个组生成至多一行(mapGroups WithState)
- 映射数据中的组,对每个组生成一行或多行(flatMapGroups WithState)
超时
可以通过GroupState.setTimeoutTimes tamp(...) API设置超时时间戳
输出模式
- mapGroupWithState仅支持update更新
- flatMapGroupsWithState支持append追加输出和update更新输出
mapGroupsWithState
需要给出如下定义
- 三个类定义:输入定义、状态定义、可选的输出定义
- 基于键、事件迭代器和先前状态的一个更新状态函数
- 超时时间函数
// 类定义
case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
case class UserState(user:String,
var activity:String,
var start:java.sql.Timestamp,
var end:java.sql.Timestamp)
// 事件迭代器、状态更新函数
def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
if (Option(input.timestamp).isEmpty) {
return state
}
if (state.activity == input.activity) {
if (input.timestamp.after(state.end)) {
state.end = input.timestamp
}
if (input.timestamp.before(state.start)) {
state.start = input.timestamp
}
} else {
if (input.timestamp.after(state.end)) {
state.start = input.timestamp
state.end = input.timestamp
state.activity = input.activity
}
}
state
}
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState}
def updateAcrossEvents(user:String,
inputs: Iterator[InputRow],
oldState: GroupState[UserState]):UserState = {
var state:UserState = if (oldState.exists) oldState.get else UserState(user,
"",
new java.sql.Timestamp(6284160000000L),
new java.sql.Timestamp(6284160L)
)
// we simply specify an old date that we can compare against and
// immediately update based on the values in our data
for (input <- inputs) {
state = updateUserStateWithEvent(state, input)
oldState.update(state)
}
state
}
// 启动
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime
.selectExpr("User as user",
"cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
.as[InputRow]
.groupByKey(_.user)
.mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("update")
.start()
flatMapGroupsWithState
需要定义以下内容
- 三个类定义:输入定义、状态定义、可选的输出定义
- 一个函数,输入参数为一个键、一个多事件迭代器和先前状态
- 超时时间函数
case class InputRow(uid:String, timestamp:java.sql.Timestamp, x:Double,
activity:String)
case class UserSession(val uid:String, var timestamp:java.sql.Timestamp,
var activities: Array[String], var values: Array[Double])
case class UserSessionOutput(val uid:String, var activities: Array[String],
var xAvg:Double)
def updateWithEvent(state:UserSession, input:InputRow):UserSession = {
// handle malformed dates
if (Option(input.timestamp).isEmpty) {
return state
}
state.timestamp = input.timestamp
state.values = state.values ++ Array(input.x)
if (!state.activities.contains(input.activity)) {
state.activities = state.activities ++ Array(input.activity)
}
state
}
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
GroupState}
def updateAcrossEvents(uid:String,
inputs: Iterator[InputRow],
oldState: GroupState[UserSession]):Iterator[UserSessionOutput] = {
inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
val state = if (oldState.exists) oldState.get else UserSession(
uid,
new java.sql.Timestamp(6284160000000L),
Array(),
Array())
val newState = updateWithEvent(state, input)
if (oldState.hasTimedOut) {
val state = oldState.get
oldState.remove()
Iterator(UserSessionOutput(uid,
state.activities,
newState.values.sum / newState.values.length.toDouble))
} else if (state.values.length > 1000) {
val state = oldState.get
oldState.remove()
Iterator(UserSessionOutput(uid,
state.activities,
newState.values.sum / newState.values.length.toDouble))
} else {
oldState.update(newState)
oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds")
Iterator()
}
}
}
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime.where("x is not null")
.selectExpr("user as uid",
"cast(Creation_Time/1000000000 as timestamp) as timestamp",
"x", "gt as activity")
.as[InputRow]
.withWatermark("timestamp", "5 seconds")
.groupByKey(_.uid)
.flatMapGroupsWithState(OutputMode.Append,
GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
.writeStream
.queryName("count_based_device")
.format("memory")
.start()