概述
Apache Flink 是一个为生产环境而生的流处理器,具有易于使用的 API,可以用于定义高级流分析程序。
Flink 的 API 在数据流上具有非常灵活的窗口定义,使其在其他开源流处理框架中脱颖而出。
Windows定义
按固定时间区间计算该区间的值,例如15s计算汇总一次:
无穷的流,数据不间断的,例如有累计数据的需求,按上图的逻辑是处理不到的。换一种思路,每隔 15 秒,我们都将与上一次的结果进行 sum 操作(滑动聚合):
流是无界的,我们不能限制流,所以上述方案也解决不了需求,但可以在有一个有界的范围内处理无界的流数据。
那么按一分钟一个时间窗口计算,相当于一个定义了一个 Window(窗口),window 的界限是1分钟,且每分钟内的数据互不干扰,因此也可以称为翻滚(不重合)窗口
第一分钟的数量为8,第二分钟是22,第三分钟是27。。。这样,1个小时内会有60个window。
再考虑一种情况,每30秒统计一次过去1分钟的数量之和:
通常来讲,Window 就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。window 又可以分为基于时间(Time-based)的 window 以及基于数量(Count-based)的 window。
Flink窗口类型
对于窗口的操作主要分为两种,分别对于Keyedstream和Datastream。他们的主要区别也仅仅在于建立窗口的时候一个为.window(...),一个为.windowAll(...)。对于Keyedstream的窗口来说,他可以使得多任务并行计算,每一个logical key stream将会被独立的进行处理。
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...)/.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
按照窗口的Assigner来分,窗口可以分为
Tumbling window, sliding window,session window,global window,custom window每种窗口又可分别基于processing time和event time。
还有一种window叫做count window,依据元素到达的数量进行分配,之后也会提到。
窗口的生命周期开始在第一个属于这个窗口的元素到达的时候,结束于第一个不属于这个窗口的元素到达的时候。
窗口的操作
Tumbling window
固定相同间隔分配窗口,每个窗口之间没有重叠。
tumbling time windows(翻滚时间窗口)
data.keyBy(1)
.timeWindow(Time.minutes(1)) //tumbling time window 每分钟统计一次数量和
.sum(1);
sliding time windows(滑动时间窗口)
固定相同间隔分配窗口,只不过每个窗口之间有重叠。窗口重叠的部分如果比窗口小,窗口将会有多个重叠,即一个元素可能被分配到多个窗口里去。
data.keyBy(1)
.timeWindow(Time.minutes(1), Time.seconds(30)) //sliding time window 每隔 30s 统计过去一分钟的数量和
.sum(1);
那么流处理器如何解释时间?
Apache Flink 具有三个不同的时间概念,即 processing time, event time 和 ingestion time。
默认采用:
TimeCharacteristic.ProcessingTime
我们可以设置为其他:
1. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
2. env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
指定为EventTime的source需要自己定义event time以及emit watermark,或者在source之外通过assignTimestampsAndWatermarks在程序手工指定
Watermark解释
Count Windows
Apache Flink 还提供计数窗口功能。如果计数窗口设置的为 100 ,那么将会在窗口中收集 100 个事件,并在添加第 100 个元素时计算窗口的值。
tumbling count window
data.keyBy(1)
.countWindow(100) //统计每 100 个元素的数量之和
.sum(1);
sliding count window
data.keyBy(1)
.countWindow(100, 10) //每 10 个元素统计过去 100 个元素的数量之和
.sum(1);
Session window
主要是根据活动的事件进行窗口化,他们通常不重叠,也没有一个固定的开始和结束时间。一个session window关闭通常是由于一段时间没有收到元素。在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。
// 静态间隔时间
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(10)));
// 动态时间
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(EventTimeSessionWindows.withDynamicGap(()));
Global window
同keyed的元素分配到一个窗口里
WindowedStream<MovieRate, Integer, GlobalWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(GlobalWindows.create());
Flink 的窗口机制
到达窗口操作符的元素被传递给 WindowAssigner。WindowAssigner 将元素分配给一个或多个窗口,可能会创建新的窗口。窗口本身只是元素列表的标识符,它可能提供一些可选的元信息,例如 TimeWindow 中的开始和结束时间。注意,元素可以被添加到多个窗口,这也意味着一个元素可以同时在多个窗口存在。
每个窗口都拥有一个 Trigger(触发器),该 Trigger(触发器) 决定何时计算和清除窗口。当先前注册的计时器超时时,将为插入窗口的每个元素调用触发器。在每个事件上,触发器都可以决定触发(即、清除(删除窗口并丢弃其内容),或者启动并清除窗口。一个窗口可以被求值多次,并且在被清除之前一直存在。注意,在清除窗口之前,窗口将一直消耗内存。
当 Trigger(触发器) 触发时,可以将窗口元素列表提供给可选的 Evictor,Evictor 可以遍历窗口元素列表,并可以决定从列表的开头删除首先进入窗口的一些元素。然后其余的元素被赋给一个计算函数,如果没有定义 Evictor,触发器直接将所有窗口元素交给计算函数。
-
计算函数接收 Evictor 过滤后的窗口元素,并计算窗口的一个或多个元素的结果。 DataStream API 接受不同类型的计算函数,包括预定义的聚合函数,如 sum(),min(),max(),以及 ReduceFunction,FoldFunction 或 WindowFunction。
- 窗口函数就是这四个:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction。前两个执行得更有效,因为Flink可以增量地聚合每个到达窗口的元素。
- Flink必须在调用函数之前在内部缓冲窗口中的所有元素,所以使用ProcessWindowFunction进行操作效率不高。不过ProcessWindowFunction可以跟其他的窗口函数结合使用,其他函数接受增量信息,ProcessWindowFunction接受窗口的元数据。
这些是构成 Flink 窗口机制的组件。 接下来我们逐步演示如何使用 DataStream API 实现自定义窗口逻辑。 我们从 DataStream [IN] 类型的流开始,并使用 key 选择器函数对其分组,该函数将 key 相同类型的数据分组在一块。
SingleOutputStreamOperator<xxx> data = env.addSource(...);
data.keyBy();
自定义Windows
Window Assigner
负责将元素分配到不同的 window。
Window API 提供了自定义的 WindowAssigner 接口,我们可以实现 WindowAssigner 的方法
public abstract Collection<W> assignWindows(T element, long timestamp)
同时,对于基于 Count 的 window 而言,默认采用了 GlobalWindow 的 window assigner,例如:
keyBy.window(GlobalWindows.create())
Trigger(触发器)
触发器定义了窗口何时准备好被窗口处理。每个窗口分配器默认都有一个触发器,如果默认的触发器不符合你的要求,就可以使用trigger(...)自定义触发器。
通常来说,默认的触发器适用于多种场景。例如,多有的event-time窗口分配器都有一个EventTimeTrigger作为默认触发器。该触发器在watermark通过窗口末尾时出发。
PS:GlobalWindow默认的触发器时NeverTrigger,该触发器从不出发,所以在使用GlobalWindow时必须自定义触发器。
Evictor(驱逐器-可选)
Evictors可以在触发器触发之后以及窗口函数被应用之前和/或之后可选择的移除元素。使用Evictor可以防止预聚合,因为窗口的所有元素都必须在应用计算逻辑之前先传给Evictor进行处理。
通过 apply WindowFunction 来返回 DataStream 类型数据
利用 Flink 的内部窗口机制和 DataStream API 可以实现自定义的窗口逻辑,例如 session window。