概述
- Windows是Flink流计算的核心,重点在于窗口的理解和应用;
- 建议详细阅读官网的window介绍,链接地址:https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html
- 基于flink-1.9.0官网理解,文章略长略枯燥,建议耐心看完。
窗口Window类型
- 根据官网的介绍;如上:
The first snippet refers to keyed streams, while the second to non-keyed ones. As one can see, the only difference is the keyBy(...) call for the keyed streams and the window(...) which becomes windowAll(...) for non-keyed streams. This is also going to serve as a roadmap for the rest of the page.
- 可以看出,对于窗口的操作分为两种,一种是keyedstrem,另一种是DataStream;他们的主要区别也仅仅在于建立窗口的时候一个为.window(...),一个为.windowAll(...)。对于Keyedstream的窗口来说,他可以使得多任务并行计算,每一个logical key stream将会被独立的进行处理。
- Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
- Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
- 按照窗口的Assigner来分,窗口可以分为Tumbling window, sliding window,session window,global window,custom window;
- 每种窗口又可分别基于processing time和event time,这样的话,窗口的类型严格来说就有很多;
- 还有一种window叫做count window,依据元素到达的数量进行分配;
窗口window的生命周期
- 总结一句话说:窗口的生命周期开始在第一个属于这个窗口的元素到达的时候,结束于第一个不属于这个窗口的元素到达的时候。窗口结束时间:
end timestamp plus the user-specified allowed lateness
; - 每一个Window都有一个Trigger和process functions (ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction)
Keyed vs Non-Keyed Windows
- 第一件事,确定DataStream是否需要keyBy;
- 如果是使用了keyBy()算子,初始化定义的DataStream会被切割为keyed stream,如果没有使用keyBy()算子,则不会被切割;
- 在keyed stream情况中,可以使用event的任何属性作为键,keyed stream允许window 计算任务由多个task并行执行,keyed stream可以独立于剩余的stream计算处理,拥有相同key的元素会被发送到相同的并行task中;
- 在non-keyed stream,原始的DataStream不会被切割到很多stream,并且所有的window都是单个task执行,并行度为1;
Window Assigners窗口分配器
- 当定义DataStream是keyedStream或者non-key stream,接下来需要定义一个window assigner。Window Assinger定义了元素如何被分配到window。根据datastream的类型选择windowAssinger中的方法----window(...) (for keyed streams) or the windowAll() (for non-keyed streams);
- windowAssigner负责分配每一个incoming element到一个或者多个window中。针对大多数的使用场景和案例,Flink内有预定义的window assigner,分别为
tumbling windows, sliding windows, session windows and global windows
,共四种。用户可以通过继承WindowAssigner class
实现自定义window assigner消费。 - Flink内置的window assigner(除了global windows)是基于time分配element到window种,time可以是event time或者processing time。(event time官网连接 event time)
- time-based windows有一个start timestamp(开始时间戳)和一个end timestamp(结束时间戳),start timestamp和end timestamp是左开右闭即
[5,10)
类型;两者共同定义window size(窗口长度);确切的说window size由start timestamp、end timestamp 、allow lateness 共同确定。 - 在flink代码内,Flink在使用基于时间(time-based)的窗口时采用TimeWindow类型,该窗口具有查询开始和结束时间戳的方法,flink带有附加方法maxTimestamp()返回给定窗口的最大允许时间戳。
Tumbling Windows 翻滚窗口
-
翻滚窗口 Tumbling Windows分配器将每个元素分配给指定窗口大小的窗口。 翻滚窗 Tumbling Windows 具有固定的长度,不重叠。 例如,如果指定大小为5分钟的翻滚窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口,如下图所示。
- 窗口使用方法
java
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
- 窗口时间间隔可以使用
Time.milliseconds(x), Time.seconds(x), Time.minutes(x)
中的一种定义; - 翻滚窗口tumbling window有一个offset参数选项,可以改变window的对齐方式;
An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of Time.hours(-8).
Sliding Windows 滑动窗口
- 滑动窗口sliding window分配器将element分配到指定长度的window。和翻滚窗口类似,sliding window固定相同间隔分配窗口,只不过每个窗口之间有重叠。窗口重叠的部分如果比窗口小,窗口将会有多个重叠,即一个元素可能被分配到多个窗口里去。
java
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
- 窗口时间间隔可以使用
Time.milliseconds(x), Time.seconds(x), Time.minutes(x)
中的一种定义; An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of Time.hours(-8).
Session Windows会话窗口
- 主要是根据活动的事件进行窗口化,他们通常不重叠,也没有一个固定的开始和结束时间。
- session window和
tumbling windows and sliding windows
相比有明显的差异;一个session window关闭通常是由于一段时间没有收到元素。 -
session window会话窗口分配器可以配置静态会话间隙或会话间隙提取器功能,该功能定义不活动时间段的长度。 当此期限到期时,当前会话将关闭,后续元素将分配给新的会话窗口。
- 官网api
java
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
- 静态会话间隔可以使用
one of Time.milliseconds(x), Time.seconds(x), Time.minutes(x)
其中一种定义; - 动态会话间隔可以通过实现
SessionWindowTimeGapExtractor
接口; - Attention:
- 由于session window会话窗口没有固定的开始和结束时间,因此它们的计算加工方式与tumbling window和sliding window不同。 在内部,session window 算子为每个到达的记录创建一个新窗口,如果它们彼此之间的距离比定义的间隙更接近,则将窗口合并在一起。 为了可合并,session window 算子需要合并触发器和合并窗口函数,例如ReduceFunction,AggregateFunction或ProcessWindowFunction(FoldFunction无法合并。)
Global Windows 全局窗口
- global window配器将具有相同键的所有元素分配给同一个全局窗口。 此窗口仅在指定自定义触发器时才有用。 否则,将不执行任何计算,因为global session没有可以聚合元素而自然结束窗口的生命周期;
- 官网api
java
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
Window Functions 窗口函数
- 窗口函数只有四种:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction。前两个执行得更有效,因为Flink可以增量地聚合每个到达窗口的元素。
- ProcessWindowFunction获取窗口中包含的所有元素的Iterable以及有关元素所属窗口的其他元信息。
- Flink必须在调用函数之前在内部缓冲窗口中的所有元素,所以使用ProcessWindowFunction进行操作效率不高。不过ProcessWindowFunction可以跟其他的窗口函数(
ReduceFunction, AggregateFunction, or FoldFunction
)结合使用,其他函数接受增量信息,ProcessWindowFunction接受窗口的元数据。
ReduceFunction
- ReduceFunction指定如何组合输入中的两个元素以生成相同类型的输出元素。 Flink使用ReduceFunction逐步聚合窗口的元素。
- 官网api
java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
scala
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
AggregateFunction
- AggregateFunction是ReduceFunction的通用版本,有三种类型:输入类型(IN),累加器类型(ACC)和输出类型(OUT)。 输入类型是输入流DataStream中元素的类型,AggregateFunction具有将一个输入元素添加到累加器的方法。 该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中以及用于从累加器提取输出(类型OUT)的方法。
- 官网api
java
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
scala
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
FoldFunction
- FoldFunction指定窗口的输入元素如何与输出类型的元素组合。 对于添加到窗口的每个元素和当前输出值,将逐步调用FoldFunction。 第一个元素与输出类型的预定义初始值组合。
- 官网api
java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
scala
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("") { (acc, v) => acc + v._2 }
- Attention:fold() cannot be used with session windows or other mergeable windows.
ProcessWindowFunction
- ProcessWindowFunction能获取包含窗口所有元素的Iterable,以及可访问时间和状态信息的Context对象,这使其能够提供比其他窗口函数更多的灵活性。 这是以性能和资源消耗为代价的,因为元素不能以递增方式聚合,而是需要在内部进行缓冲,直到认为窗口已准备好进行处理。
- DataStream的key是通过为keyBy()调用指定的KeySelector提取。 在元组索引键或字符串字段引用的情况下,此键类型始终为Tuple,必须手动将其转换为正确大小的元组以提取键字段。
- 官网api
java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
scala
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
ProcessWindowFunction with Incremental Aggregation
- ProcessWindowFunction可以与ReduceFunction,AggregateFunction或FoldFunction结合使用,以便在元素到达窗口时递增聚合元素。 当关闭窗口时,
ReduceFunction,AggregateFunction或FoldFunction
将为ProcessWindowFunction提供聚合结果。 这允许ReduceFunction,AggregateFunction或FoldFunction
访问ProcessWindowFunction的附加窗口元信息的同时递增地计算窗口。
Incremental Window Aggregation with ReduceFunction
- java
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
- scala
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((context.window.getStart, min))
}
)
Incremental Window Aggregation with AggregateFunction
- java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
- scala
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]): () = {
val average = averages.iterator.next()
out.collect((key, average))
}
}
Incremental Window Aggregation with FoldFunction
- java
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
// Function definitions
private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(cur + 1, 2);
return acc;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
}
}
- scala
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
Using per-window state in ProcessWindowFunction
- ProcessWindowFunction 可以在获取keyed 状态(state)---(as any rich function can)外,同样可以使用 keyed state--keyed state的作用域为当前正在处理的窗口。
- 在这种情况下,了解每个窗口状态所指的窗口是很重要的。 涉及不同的“窗口”:
- 指定窗口操作时定义的窗口:这可能是1小时的翻滚窗口或滑动1小时的2小时滑动窗口。
- 给定key的已定义窗口的实际实例:对于user-id xyz,这可能是从12:00到13:00的时间窗口。 这基于窗口定义,并且将基于作业当前正在处理的key的数量以及基于事件落入的time slots而存在许多窗口。
- 每窗口状态与后两者相关联。 这意味着如果我们处理1000个不同key的事件,并且所有这些event当前都落入
[12:00,13:00)
时间窗口,那么将有1000个窗口实例,每个窗口实例都有自己的keyed pre-window state(窗口状态)。 - 在Context对象上有两个方法,process()调用接收它们允许访问两种类型的状态:
- globalState():允许访问未限定在窗口的keyed state
- windowState():允许访问也限定在窗口范围内的keyed state
- 在开发中,如果一个window会多次触发执行,那么这个功能将非常有用;如果window有迟到的数据或者自定义的window trigger会推测执行(提前触发window计算),在这种情况下,你需要在per-window state存储先前触发的信息或触发执行的次数;
- 使用窗口状态时,清除窗口时清除该状态也很重要。 这应该在clear()方法中发生。
Triggers 触发器
- 触发器决定window function何时进行执行(由)。 每个WindowAssigner都带有一个默认触发器。
- 如果默认触发器不符合需要,您可以使用trigger(...)自定义触发器。
- trigger interface有五种方法允许trigger对不同的事件做出响应:
- oneElement():为添加到窗口的每个元素调用方法。
- oneEventTime():当注册的事件时间计时器触发时,将调用onEventTime()方法。
- onProcessingTime():当注册的处理时间计时器触发时,将调用onProcessingTime()方法。
- onMerge():onMerge()方法与有状态触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如: 使用会话窗口时。
- clear():finaly,clear()方法执行删除相应窗口时所需的任何操作。
- 关于上述方法需要注意两点:
- 1)前三个决定如何通过返回TriggerResult来对其调用事件进行操作。 该操作可以是以下之一:
- CONTINUE: do nothing,
- FIRE: trigger the computation,
- PURGE: clear the elements in the window, and(官网缺失内容......)
- FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.
2)这些方法中的任何一种都可用于为将来的操作processing- or event-time时间计时器。
Fire and Purge(触发和清除)
- 一旦trigger确定窗口已准备好进行处理,它就会触发执行,即返回FIRE或FIRE_AND_PURGE。 这是窗口算子给出当前窗口结果的信号。 给定一个带有ProcessWindowFunction的窗口,所有元素都传递给ProcessWindowFunction(可能在将它们传递给evictor后)。 具有ReduceFunction,AggregateFunction或FoldFunction的Windows只会发出聚合的结果。
- 当trigger触发时,它可以是FIRE或FIRE_AND_PURGE。 当FIRE保留窗口内容时,FIRE_AND_PURGE会删除其内容。 默认情况下,预先实现的触发器只需FIRE而不会清除窗口状态。
- purge只是简单地删除窗口的内容,但是并将保留有关窗口和trigger state的潜在元信息。
Built-in and Custom Triggers
- The EventTimeTrigger fires based on the progress of event-time as measured by watermarks.
- The ProcessingTimeTrigger fires based on processing time.
- The CountTrigger fires once the number of elements in a window exceeds the given limit.
- The PurgingTrigger takes as argument another trigger and transforms it into a purging one.
Evictors 驱逐器
- 除了WindowAssigner和Trigger之外,Flink的窗口模型还允许指定可选的Evictor。
- evictor可以实现在窗口trigger触发后,window funtions执行
before and/or after
移除元素; - evictor接口有两种方法:evictBefore,evictAfter;
- flink中有三种已经实现好的evictor:
- CountEvictor:从窗口保持用户指定数量的元素,并从窗口缓冲区的开头丢弃剩余的元素。
- DeltaEvictor:采用DeltaFunction和阈值,计算窗口缓冲区中最后一个元素与其余每个元素之间的差值,并删除delta大于或等于阈值的值。
- TimeEvictor:将参数作为一个间隔(以毫秒为单位),对于给定的窗口,它查找其元素中的最大时间戳max_ts,并删除时间戳小于(max_ts - interval)的所有元素。
- By default, all the pre-implemented evictors apply their logic before the window function.
Allowed Lateness
- 默认情况下,当水印超过窗口end timestamp时,会删除延迟元素。 但是,Flink允许为窗口算子指定最大允许延迟(
allowed lateness
)。 允许延迟指定元素在被删除之前可以迟到多少时间,并且其默认值为0。在element的watermark已经过了窗口end timestamp,但在element的watermark在窗口的end timestamp+allowed lateness时间前,element仍然被添加到对应的窗口中。 根据所使用的trriger,延迟但未丢弃的元素可能会导致窗口再次触发。 EventTimeTrigger就是这种情况。 - By default, the allowed lateness is set to 0. That is, elements that arrive behind the watermark will be dropped.
- 官网api
java
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
- When using the GlobalWindows window assigner no data is ever considered late because the end timestamp of the global window is Long.MAX_VALUE。
Getting late data as a side output
- Using Flink’s side output feature you can get a stream of the data that was discarded as late.
- You first need to specify that you want to get late data using
sideOutputLateData(OutputTag)
on the windowed stream. Then, you can get the side-output stream on the result of the windowed operation: - 官网api
java
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
scala
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)
Late elements considerations 迟到元素的考虑项
- 当指定允许的延迟大于0时,在水印通过窗口结束后保持窗口及其内容。 在这些情况下,当一个迟到但未删除的元素到达时,它可能触发窗口的trigger。 这些触发被称为
late firings
,因为它们是由迟到事件触发的,与main firing
相反,后者是窗口的第一次触发。 在会话窗口的情况下,late firings
可以进一步导致窗口的合并,因为它们可以“桥接”两个预先存在的未合并窗口之间的间隙。 - 你应该知道,后期触发发出的元素应该被视为先前计算的更新结果,即你的数据流将包含同一计算的多个结果。 根据你的应用程序,你需要考虑这些重复的结果或对其进行重复数据删除。
Consecutive windowed operations 连续的窗口操作
- 如前所述,计算窗口结果的时间戳的方式以及水印与窗口交互的方式允许将连续的窗口操作串联在一起。 当想要执行两个连续的窗口操作时,希望使用不同的key,但仍希望来自同一上游窗口的元素最终位于同一下游窗口中。 考虑这个例子:
DataStream<Integer> input = ...;
DataStream<Integer> resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer());
DataStream<Integer> globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction());
scala
val input: DataStream[Int] = ...
val resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer())
val globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction())
Useful state size considerations state size使用注意事项
- Windows可以在很大一段时间内(例如几天,几周或几个月)定义,因此可以累积非常大的状态。 在估算窗口计算的存储要求时,需要记住几条规则:
- Flink为每个窗口创建一个每个元素的副本。 鉴于此,翻滚窗口保留每个元素的一个副本(一个元素恰好属于一个窗口,除非它被延迟)。 相反,滑动窗口会创建每个元素的几个,如"window assigner"部分中所述。 因此,window size为1天且滑动1秒的滑动窗口可能不是一个好主意。
- ReduceFunction,AggregateFunction和FoldFunction可以显着降低存储要求,因为它们迫切地聚合元素并且每个窗口只存储一个值。 相反,只需使用ProcessWindowFunction就需要存储所有元素。
- 使用Evictor可以防止任何预聚合,因为在应用计算之前,窗口的所有元素都必须通过evictor传递(参见Evictors)。
- 参考博客: Flink窗口介绍及应用