structured streaming
模型思想
Structured Streaming模型是把数据流当作一个没有边界的数据表来对待,这样开发人员可以在流上使用Spark SQL进行流处理,可以使用离线spark程序的开发方式来开发流处理程序,降低了开发门槛。
这张图就显示了,在新来数据的时候,会把新的row追加到没边界的表里面。
我们以一个典型的worldCount程序为例,在流入数据的时候,会把数据写到表里面,然后在这个表上执行word count查询后,把统计出的word count写到结果表中并输出。
Structured Streaming 返回的是 DataFrame/DataSet,我们可以对其应用各种操作,从无类型,类似 SQL 的操作(例如 select,where,groupBy)到类型化的 RDD 类操作(例如 map,filter,flatMap)。
很多操作也不在流上支持:
- 还不支持多个流聚集(即,流 DF 上的聚合链)。
- 不支持 limit 和 take(N)
- 不支持 Distinct
- sort 操作仅在聚合后在完整输出模式下支持
- 流和静态流的外连接支持是有条件的:
- 不支持带有流 DataSet 的完全外连接
- 不支持右侧的流的左外连接
- 不支持左侧的流的右外部联接
- 不支持两个流之间的任何 join
- count() - 无法从流 DataSet 返回单个计数。
- foreach()
- show() - 可以输出到控制台Sink来代替。
窗口操作
较之于DStream的窗口操作,一个显著的改进是新的窗口运算可以基于”事件时间(数据所代表的事件发生时的时间)”(Event Time)进行计算而不在是数据进入到流上的时间。
<font size=2>✳ Flink支持三种时间,还有一种是事件接入时间(Ingestion Time)</font>
下图以10分钟为窗口尺度,统计了12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20三个时间窗口上word count值。dog这个单词在12:07收到,它会影响12:00 - 12:10, 12:05 - 12:15两个窗口。
<font size=2>✳ Flink区分了滚动窗口和滑动窗口,好像还有基于活跃度的会话窗口</font>
数据延迟
数据延迟基本上是没办法避免的事情,Structured Streaming可以保证一条旧的数据进入到流上时,依然可以基于这些“迟到”的数据重新计算并更新计算结果,但是这样会一个问题,即需要在流上维持一个很大时间跨度的数据集,这会消耗很大的资源。
Structured Streaming引入了一种叫watermarking的机制来应对这个问题。watermarking实际上就是数据的事件时间与在其流上能找到的最大事件时间的最大差值(Time-To-Live, TTL),如果这个差值超过了设定的阈值,就意味着数据太陈旧了,时效性超出了流计算应该关注的区间,不再参与计算。(超时机制,低于水位线的都淹死了)
- In every trigger, while aggregate the data, we also scan for the max value of event time in the trigger data
- After trigger completes, compute watermark = MAX(event time before trigger, max event time in trigger)
引擎跟踪的最大事件时间是蓝色虚线,每个触发开始时设置watermark 为(最大事件时间 - 10分钟)是红线。
我们先看一个延迟到达但没有超过watermark的例子:(12:09, cat) ,这个数据会最先进入12:05 - 12:15这个窗口(虽然正常情况下它在12:00-12:10这个窗口开启时就应该已经就绪了,显然它是一个迟到的数据)。
watermark设定为10分钟话意味着有效的事件时间可以推后到12:14 - 10m = 12:04,因为12:14是这个窗口中接收到的最晚的时间,代表目标系统最后时刻的状态,由于12:09在12:04之后,所以也被计入。
另一个超出watermark的例子是(12:04, donkey),这个时候水位线是(12:21, owl),watermark为10分钟意味着有效的事件时间可以推后到12:11,而(12:04, donkey)比这个值还要早,说明它”太旧了”,所以不会被更新到结果表中了。
<font size=2>✳ Flink支持将旧数据单独列到一个地方,便于进行后续的访问</font>
Output Mode
- Append模式:顾名思义,既然是Append,那就意味着它每次都是添加新的行,那么也就是说:它适用且只适用于那些一旦产生计算结果便永远不会去修改的情形, 所以它能保证每一行数据只被数据一次。只有select,where,map,flatMap,filter,join等的查询将支持Append模式。
- Complete模式:整张结果表在每次触发时都会全量输出!这显然是是要支撑那些针对数据全集进行的计算,例如:聚合
-
Update模式:某种意义上是和Append模式针锋相对的一个种模式,它只输出上次trigger之后,发生了“更新”的数据的,这包含新生的数据和行发生了变化的行。对于数据库类型的sink来说,这是一种理想的模式。
持续计算
spark 2.3中引入一种能够达到毫秒级低延迟的计算模式:持续计算。
两种计算模式如下:默认(micro-batches)
micro-batch
spark一直以micro-batches来处理,spark streaming 计算引擎阶段性地检查数据流,然后批量处理数据。它的延迟最低100 ms。
在处理一批数据之前,先把这一批数据记录的偏移量写到whl日志中(write head log)(用于下一批数据查询), 等到把偏移量保存完成后开始计算,这样就产生了延迟。
- 首先,要设置trigger,然后根据这个生成mini-batch
- 然后,使用 sql-engine 将dataframe转化成rdd (逻辑执行计划->物理执行计划)
- 从source获取数据,处理,并写入sink
- 循环上面的步骤,执行下一个micro-batch
- 各个batch之间,需要获取state,更新并传入下一个batch
在这种体系结构中,driver将检查点保存到WAL日志中,该日志可以用于重新启动查询。请注意,下一个micro-batch中要处理的范围偏移量将在其开始之前保存在日志中,以便获得确定性的重新执行和end-to-end语义。因此,source上可用的记录可能必须等待当前micro-batch处理完成,然后记录其偏移量,并在下一个micro-batch中进行处理。
设定trigger的触发时间为100ms,不断的对source写入数据,可以发现前一个trigger触发的数据批次计算job如果没有处理完,后一个job不会启动,不会并行的去执行job。每个trigger触发时会启动一个新的job计算当前批次数据。
持续计算
而新引入的持续计算模式下,不是阶段性的发起task,而是spark发起一个长期运行的long-running task,持续地读、计算、写。对于保存数据记录的偏移量,相当于在数据流流入spark的时候上打标记,两个标记之间叫 epoch,跟阶段的意思差不多,task在遇到一个标记的时候会异步的保存这个偏移量,对于持续计算是没有影响的。
最低延迟在1ms以下。(at least once,需要自己处理)
暂时这种模式支持的操作有限,它主要支持Map,Filter和Project。 不支持聚合操作,连接,窗口等。
- 设置为continuous trigger (check-point的时间间隔)
- 使用 sql-engine 将dataframe转化成rdd (只进行一次)
- 从source获取数据,处理,并写入sink
- 各partition long running task, 执行完数据后并不会结束,会继续拉取数据进行处理。
现在还处于试验阶段,不支持生产。社区进度也较慢。只支持map filter等操作,不支持聚合,想要聚合的话,必须coalesce(1)到一个partition上才可以。
2.4版本特性
- 在complete/append模式下,支持Limit操作。 [spark-24662]
- foreachBatch api, 参数是rdd,所以可以使用rdd来做一些暂时不支持的操作。它的数据可以写入多个location里面。对于continuous,可以有foreach操作。 [spark-24565]
- 批流是兼容check-point文件的,只需要修改trigger即可。这里修改了一个bug,在转化的时候数据是错的。 [spark-25399]
- 在多个流聚合的时候,之前是选择最小的watermark,现在可以选择最小的或者最大的。 [spark-24730]
杂项
stateful操作
如果相互批次之间的数据并没有相互影响,叫做stateless操作。但是,譬如count操作,则需要在批次之间传递数据。
因为流处理结果是不断增长的。因此,不断增长的结果需要存储在容错的State Store中。
State Store的目的是提供一个可靠的地方,引擎可以从那里读取Structured Streaming聚合的中间结果。因此,即使driver出现故障,Spark也能将状态恢复到故障前的状态。State Store是由类似于HDFS的分布式文件系统支持的。为了保证可恢复性,必须至少存储两个最近版本。例如,如果批次#10在处理过程中失败,那么State Store可能具有批次#9和批次#10一半的状态。Spark将从批次#9开始重新计算,因为批次#9是最后一个成功完成的批次。同时,State Store中还存在对于旧状态的垃圾回收机制。
State Store处理两类文件:delta文件和snapshot文件。delta文件包含每个查询执行结果的状态表示。它是由给定executor中注册的行更改提供的tmp delta file构造的(State Store与partition相关,每个executor在一个hash map中存储状态数据)。tmp delta file的名称遵从“temp-{Random.nextLong}”模式。最后,在调用commit方法时,为新版本创建最终的delta文件,其名称遵从“version.delta”模式。最后,多个delta文件合并到snapshot文件中,这些文件的名称遵从“version.snapshot”模式。
StateStore 本身也带了 maintainess 即维护模块,会周期性的在后台将过去的状态和最近若干版本的流水 log 进行合并,并把合并后的结果重新写回到 HDFS:old_snapshot + delta_a + delta_b + … => lastest_snapshot。
StateStore 模块提供了 分片的、分版本的、可迁移的、高可用 key-value store。
基于这个 StateStore 模块,StreamExecution 实现了 增量的 持续查询、和很好的故障恢复以维护 end-to-end exactly-once guarantees。
Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据(而不仅仅是本次执行信收到的数据),所以每次执行的结果是针对全量数据进行计算的结果。但是在实际执行过程中,由于全量数据会越攒越多,那么每次对全量数据进行计算的代价和消耗会越来越大。
Structured Streaming 的做法是:转全量为增量,即在每次执行时,先从 StateStore 里 restore 出上次执行后的状态,然后加入本执行的新数据,再进行计算。如果有状态改变,将把改变的状态重新 save 到 StateStore 里。所以 Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据,但在具体实现上转换为增量的持续查询。
在continuous模式下,spark通过维护一组long-running task集合来持续对数据进行read,process,write操作,这样就避免了task的创建销毁等操作,并且checkpoint的操作也优化为异步,这样就极大的减少了延迟。这种模型不再是用基于批去模拟流,而是基于事件流的思路。
那么问题来了,spark是怎么实现的呢,long-running还好说,直接在task的compute中写循环即可,那么checkpoint怎么做呢,要知道spark基于checkpoint实现容错,当一个批处理完后,spark会写一些offset,snapshot到hdfs。而现在没有批的概念。
没有批就要创造批,流是批的超集,我们需要定义一种规则在流中划分出一个个批即可。这里spark采用了Chandy-Lamport algorithm来做批的划分,从而实现分布式checkpoint。其原理是这样的,在每个task的数据流事件中注入epoch marker事件,在driver端做整体epoch的自增维护。当task处理到epoch marker事件后就通知driver,当driver发现收集到的epoch marker数量等同于source和sink的partition数量,那么就说明这一个epoch已经完成,driver端就可以把一个epoch的数据看做是一个批,从而进行checkpoint记录。
容错
streamExecution 从source获取offset,并使用这个去拿去数据,处理后写入sink,最后commit,并将结果存储至hdfs。
micro-batch 会在每个partition 把state状态保存到hdfs。每个batch会写版本号。
如果出错的时候,可以从文件中恢复数据。
由于 exectutor 节点的故障可由 Spark 框架本身很好的 handle,不引起可用性问题,现在只讨论 driver 故障恢复。
如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution:
读取 WAL offsetlog 恢复出最新的 offsets 等
读取 batchCommitLog 决定是否需要重做最近一个批次
这样即可保证每次执行的计算结果,在 sink 这个层面,是 不重不丢 的 —— 即使中间发生过 1 次或以上的失效和恢复。
一致性语义
micro-batch 模式可以提供 end-to-end 的 exactly-once 语义。原因是因为在 input 端和 output 端都做了很多工作来进行保证,比如 input 端 replayable + wal,output 端写入幂等。
continuous mode 只能提供 at-least-once, 它牺牲了一次性语义以减少延迟。
micro-batch 基于偏移和提交日志,而 continuous 仅使用提交的日志。
假设提交日志中的最后 batch / epoch id是#2,它对应于偏移量(4,5,6)(= 3个Kafka分区)。偏移日志中的最新值是(7,8,9),它们对应于批次/时期#3。
continuous 执行读取提交日志并查看最后一个纪元#2。然后它检索与其对应的偏移量((4,5,6))。所以它会重新处理(4,5,6)。
micro-batch 执行更多步骤。首先,它获取最后的偏移日志((7,8,9))。接下来,它将它们与最后提交的偏移量((4,5,6))进行比较。由于两者具有不同的批次ID,因此保留对应于最近批次ID的偏移量以进行处理。
应用举例
- trigger once 设置为周期启动的job(cron),会自动处理offset,所以可以不间断的处理数据。
- 低时延的更新,可以通过streaming模式,从redis上更新到下游。
- 结合维表,可以使用 stream 和 batch 进行 join/union。
- binlog-> streaming -> delta (采集mysql数据倒入hive)
- 数据写入多个表