Spark权威指南读书笔记(五):流处理

第二十章 流处理基础

什么是流处理

流处理是连续处理新到来的数据以更新计算结果的行为。在流处理中,输入数据是无边界的,没有预定的开始或结束。

流处理的挑战
  • 基于应用程序时间戳处理无序数据
  • 维持大量的状态
  • 支持高吞吐
  • 即使有机器故障也仅需对事件进行一次处理
  • 处理负载不平衡和拖延者
  • 快速响应事件
  • 与其他存储中的数据进行连接
  • 确定新事件到达时如何更新输出

流处理设计要点

记录级别API和申明式API
  • 流处理API最简单的实现方法就是将每个事件传递给应用程序,并使用自定义代码进行响应。提供这种记录级别的API的流处理系统只是给用户提供一个获取每条流数据记录的接口,这样许多复杂状态需要由应用程序负责。
  • 因此后续的流处理系统提供了声明式API,应用程序为了响应每个新事件指定要计算的内容,而不是如何计算,也不需要考虑如何从失败中恢复。DStream API可以自动追踪每个操作处理的数据量,可靠地保存相关状态,并在需要的时候从失败中恢复计算。
连续处理与微批处理
  • 连续处理模式中,每个节点都不断侦听来自其他节点的消息并将新的更新输出到其子节点。map-reduce中,map的每个节点将从输入源一个一个地读取记录,根据计算逻辑将它们发送到相应的reducer,reducer获取新记录时,将更新状态


    连续处理
  • 微批处理系统等待积累少量输入数据,然后使用分布式任务集合并行处理每个批次


    微批处理

Spark的流处理API

DStream API
  • 它完全基于Java/Python对象函数,而不是DataFrame Dataset中的结构化表概念
  • 完全基于处理时间
  • 仅支持微批处理
结构化流处理
  • Spark 2.2仅支持微批处理
  • 提供结构化处理流式数据的可能

第二十一章 结构化流处理基础

结构化流处理概述

结构化流处理背后的主要思想是将数据流视为连续追加数据的数据表
结构化流即是以流处理方式处理的DataFrame

结构化流处理的输入

核心概念

转换和动作

同样适用这两个概念,只是略微有一些限制

输入源
  • Kafka
  • HDFS
  • 用于测试的socket源
接收器
  • 需要指定数据源来读取数据流,接收器(sink)和执行引擎还负责可靠地跟踪号数据处理的进度
  • 支持的接收器:kafka、文件、用于测试的控制台接收器、用于调试的内存接收器、用于在输出记录上运行任意计算的foreach接收器
输出模式
  • append (向输出接收器中添加新记录)
  • update (更新有变化的记录)
  • complete (重写所有的输出)
触发器
  • 定义了数据何时被输出
事件时间处理
  • watermarks 允许你制定在事件时间内查看数据的
  • 支持事件时间的系统通常允许设置watermarks来限制它们记住旧数据的时长

结构化流处理实例

val static = spark.read.json("/data/activity-data/")
val dataSchema = static.schema

val streaming = spark.readStream.schema(dataSchema)
  .option("maxFilesPerTrigger", 1).json("/data/activity-data")

val activityCounts = streaming.groupBy("gt").count()

spark.conf.set("spark.sql.shuffle.partitions", 5)

val activityQuery = activityCounts.writeStream.queryName("activity_counts")
  .format("memory").outputMode("complete")
  .start()
// 启动流式计算
activityQuery.awaitTermination()
// 查询流数据
for( i <- 1 to 5 ) {
    spark.sql("SELECT * FROM activity_counts").show()
    Thread.sleep(1000)
}
数据输入和输出
// 读取kafka
val ds3 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
// 写入kafka
ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream.format("kafka")
  .option("checkpointLocation", "/to/HDFS-compatible/dir")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

// foreach接收器
datasetOfString.write.foreach(new ForeachWriter[String] {
  def open(partitionId: Long, version: Long): Boolean = {
    // open a database connection
  }
  def process(record: String) = {
    // write string to connection
  }
  def close(errorOrNull: Throwable): Unit = {
    // close the connection
  }
})

// 触发器
import org.apache.spark.sql.streaming.Trigger

activityCounts.writeStream.trigger(Trigger.ProcessingTime("100 seconds"))
  .format("console").outputMode("complete").start()

第二十二章 事件时间和有状态处理

有状态处理

  • 当你执行有状态操作时,Spark会为你处理所有复杂的事情。例如,在实现分组操作时,结构化流处理会为你维护并更新信息,你只需指定处理逻辑。在执行有状态操作时,Spark会将中间结果信息存储在状态存储中。Spark当前的状态存储实现是一个内存状态存储,它通过将中间状态存储到检查点目录来实现容错。

滚动窗口

  • 窗口不会发生重叠,指定窗口的间隔
  • 时间窗口实际上是一个结构体


    滚动窗口
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

滑动窗口

  • 窗口可以重叠


    滑动窗口
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
  .count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

使用水位处理延迟数据

  • 指定水位可以确定过期数据
  • 指定水位的方式
import org.apache.spark.sql.functions.{window, col}
withEventTime
  .withWatermark("event_time", "5 hours")
  .groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
  .count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

在流中删除重复项

dropDuplicates

import org.apache.spark.sql.functions.expr

withEventTime
  .withWatermark("event_time", "5 seconds")
  .dropDuplicates("User", "event_time")
  .groupBy("User")
  .count()
  .writeStream
  .queryName("deduplicated")
  .format("memory")
  .outputMode("complete")
  .start()

任意有状态处理

  • 可以根据给定键的计数创建窗口
  • 如果特定时间范围发生多个特定事件则报警
  • 如果不确定时间内维护用户会话,保存这些会话一遍稍后进行分析

执行这类处理时需要做以下两件事

  • 映射数据中的组,对每组数据进行操作,并为每个组生成至多一行(mapGroups WithState)
  • 映射数据中的组,对每个组生成一行或多行(flatMapGroups WithState)
超时

可以通过GroupState.setTimeoutTimes tamp(...) API设置超时时间戳

输出模式
  • mapGroupWithState仅支持update更新
  • flatMapGroupsWithState支持append追加输出和update更新输出
mapGroupsWithState

需要给出如下定义

  • 三个类定义:输入定义、状态定义、可选的输出定义
  • 基于键、事件迭代器和先前状态的一个更新状态函数
  • 超时时间函数
// 类定义
case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
case class UserState(user:String,
  var activity:String,
  var start:java.sql.Timestamp,
  var end:java.sql.Timestamp)

// 事件迭代器、状态更新函数
def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
  if (Option(input.timestamp).isEmpty) {
    return state
  }
  if (state.activity == input.activity) {

    if (input.timestamp.after(state.end)) {
      state.end = input.timestamp
    }
    if (input.timestamp.before(state.start)) {
      state.start = input.timestamp
    }
  } else {
    if (input.timestamp.after(state.end)) {
      state.start = input.timestamp
      state.end = input.timestamp
      state.activity = input.activity
    }
  }

  state
}

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState}
def updateAcrossEvents(user:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserState]):UserState = {
  var state:UserState = if (oldState.exists) oldState.get else UserState(user,
        "",
        new java.sql.Timestamp(6284160000000L),
        new java.sql.Timestamp(6284160L)
    )
  // we simply specify an old date that we can compare against and
  // immediately update based on the values in our data

  for (input <- inputs) {
    state = updateUserStateWithEvent(state, input)
    oldState.update(state)
  }
  state
}

// 启动
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime
  .selectExpr("User as user",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
  .as[InputRow]
  .groupByKey(_.user)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("update")
  .start()
flatMapGroupsWithState

需要定义以下内容

  • 三个类定义:输入定义、状态定义、可选的输出定义
  • 一个函数,输入参数为一个键、一个多事件迭代器和先前状态
  • 超时时间函数
case class InputRow(uid:String, timestamp:java.sql.Timestamp, x:Double,
  activity:String)
case class UserSession(val uid:String, var timestamp:java.sql.Timestamp,
  var activities: Array[String], var values: Array[Double])
case class UserSessionOutput(val uid:String, var activities: Array[String],
  var xAvg:Double)

def updateWithEvent(state:UserSession, input:InputRow):UserSession = {
  // handle malformed dates
  if (Option(input.timestamp).isEmpty) {
    return state
  }

  state.timestamp = input.timestamp
  state.values = state.values ++ Array(input.x)
  if (!state.activities.contains(input.activity)) {
    state.activities = state.activities ++ Array(input.activity)
  }
  state
}

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
  GroupState}

def updateAcrossEvents(uid:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserSession]):Iterator[UserSessionOutput] = {

  inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
    val state = if (oldState.exists) oldState.get else UserSession(
    uid,
    new java.sql.Timestamp(6284160000000L),
    Array(),
    Array())
    val newState = updateWithEvent(state, input)

    if (oldState.hasTimedOut) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else if (state.values.length > 1000) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else {
      oldState.update(newState)
      oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds")
      Iterator()
    }

  }
}

import org.apache.spark.sql.streaming.GroupStateTimeout

withEventTime.where("x is not null")
  .selectExpr("user as uid",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp",
    "x", "gt as activity")
  .as[InputRow]
  .withWatermark("timestamp", "5 seconds")
  .groupByKey(_.uid)
  .flatMapGroupsWithState(OutputMode.Append,
    GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("count_based_device")
  .format("memory")
  .start()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342

推荐阅读更多精彩内容