上篇文章对Flink中常见的windowAssigner,如:TumblingEventTimeWindow, SlidingEventTimeWindow, EventTimeSessionWindows 等中的默认提供的trigger:EventTimeTrigger进行了剖析,讨论了trigger注册的回调函数的执行,trigger会触发怎样的triggerResult,如何处理迟到数据的trigger触发,以及提醒了需要注意的几点,明白flink中的EventTimeTrigger为什么这么写。
这篇文章就讨论下当数据流入window时,会发生什么。我们着重分析的类就是上篇文章中提到的WindowOperator类。
在讨论数据流入window会触发的一系列动作之前,我们需要明确一个window操作包括哪些部分。
- window assigner 指明数据流中的数据属于哪个window
- trigger 指明在哪些条件下触发window计算,基于处理数据时的时间以及事件的特定属性、
- evictor 可选组件,在window执行计算前或后,将window中的数据移除,如使用globalWindow时,由于该window的默认trigger为永不触发,所以既需要实现自定义trigger,也需要实现evictor,移除部分已经计算完毕的数据。
- window process flink默认提供的有 ReduceFunction,AggragateFunction.还可以自定义实现 windowProcessFunction。作用便是当trigger返回FIRE结果时,计算window的结果。
这篇文章,先不讨论window的early emit,即window在触发完整的计算之前,为减小延迟而进行的提前计算。不过通过上面的四个组件,也可以想明白,只要自定义的trigger定时或定量或根据某条件在window触发完整计算之前,产生FIRE结果,便可使用window process先行根据目前window中的不完整数据,提前计算一个结果。
以下例子中,我们使用以下前提:
- 使用的window assigner实现的是 WindowAssigner 而不是 MergingWindowAssigner
- 同时我们分析的 event-time 语义
- window操作作用在 KeyedStream 上
接下来要分析的 WindowOperator 类可以看做是一个调度器,它持有window操作相关的所有组件,不包括evctor,因为含有evctor组件的window操作被封装为 EvictingWindowOperator 。 WindowOperator 定义了
window assigner, trigger, windowProcessFunction 的执行顺序如何,它们之间的执行逻辑等。
WindowOperator 类实现了 Triggerable 接口,为什么要实现这个接口呢?这是为了方便为 window 指派 window 过期时的回调函数,因此 WindowOperater 类中实现了 onEventTime 与 onProcessTime 两个方法,分别对应不用语义下 window 过期时的回调函数的执行逻辑,即:当flink 决定删除 window 时,都做了什么操作,删除了哪些东西。
WindowOperator 类也实现了 OneInputStreamOperator 接口,实现了其 processElement 方法,当新数据流入时,调用该方法。
在真正分析代码前,有必要先说明下 WindowOperator 中的几个内部类:
- WindowContext 每个window都有自己的context,持有该window与window的state。在 WindowOperator 类中,该类对应 processContext 成员。
- Context 实现了 Trigger.OnMergeContext 接口。作为一个处理window中trigger的公共类,该类中持有key与window两个成员,方便根据key与window处理特定的trigger。在 WindowOperator 类中,该类对应 triggerContext 成员。
另外,在 WindowOperator 中有一个 windowState 成员,以 window 为 namespace,以隔离不同的window的context。这里虽然叫做 windowState 。但是通过稍后的代码可以发现,该类存储的是不同window中的对应的原始数据(processWindowFunction情况)或结果(ReduceFunction/AggregateFunction情况)。
有了上面的基本认识,下面分析,当数据流入window时,发生了什么。
首先,根据刚刚所说,每一个流入的数据都会调用 processElement 方法:
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
WindowOperator 类,首先使用用户选择的 windowAssigner 将流入的数据分配到响应的window中,有可能是1个,0个甚至多个window。
第二句的 isSkippedElement 变量,在我们的前提下,没有作用。
第三句获取当前数据所在的KeyedStream的那个key上。这个key在稍后的 triggerContext 成员中会用到。
再下面的if语句不会进入,在我们前提中,我们进入的是else语句:
} else {
for (W window: elementWindows) {
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}
在 else 语句中,对该流入数据所分配到的每个window执行以下逻辑:
- 判断该window是否已过期。判断条件如下:
protected boolean isWindowLate(W window) {
return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
通过该判断可以看出,flink提供了 allowedLateness 变量用于指明允许迟到数据最多可以迟到多久,因此,window的过期时间不仅仅是其 maxTimestamp, 还需要加上等待迟到数据的时间。
- 获取该window的context,将数据加入。
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
这里需要指出,flink提供了两种 window process:
- Incremental Aggregation Functions。ReduceFunction 与 AggregateFunction ,其特点是无需保存 window 中的所有数据,一旦新数据进入,便可与之前的中间结果进行计算,因此这种 window 中其状态仅需保存一个结果便可。
- ProcessWindowFunction。用户实现 ProcessWindowFunction 的自定义处理逻辑,特点是保存了 window 的所有数据,只有触发了 trigger 后才可以执行计算。
因此这里 windowState 根据 window 获取到的 state 是不同的。针对第一种情况,返回的是 HeapReducingState, HeapAggregatingState ,当执行到 windowState.add(element.getValue());
语句时,便直接得出结果。而第二种情况,返回的是 HeapListState ,当执行到 windowState.add(element.getValue());
语句时,仅仅是将数据加入到list中。
- 继续往下走,获取该key下该 window 的trigger,并执行trigger的 onElement 方法,来确定需不需要触发计算。
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
根据上篇的解释可知,在默认trigger下,仅当流入的是迟到数据才会在 onElement 中触发trigger。
因此,这里大家就可以实现自己的trigger,根据流入的每一个数据,判断是否需要触发trigger,达到提前触发计算的目的。
- 根据trigger的结果,执行不同的逻辑
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
- FIRE: 代表触发window的计算。首先从 windowState 中获取内容。由刚刚的分析知道,在 Incremental Aggregation Functions 情况下,返回的是一个常量 : 计算结果。在 ProcessWindowFunction 情况下,返回的是当前window中的数据,一个list的iterator对象。然后执行
emitWindowContents(window, contents);
语句
private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
processContext.window = window;
userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
}
该方法会调用用户实现的计算逻辑(ProcessWindowFunction实现类),将流入的数据 contents 经过计算,得到结果后写入 timestampedCollector。
- PURGE: 代表需要清除window。这里就是执行
windowState.clear();
语句。结果便是window的计算结果(Incremental Aggregation Functions 情况下)或者缓存的数据(ProcessWindwoFunction 情况下)清除,即:该window的状态被清除。但是此时window对象还未删除,相关的trigger中的自定义状态与 ProcessWindowFunction 中的状态还未删除。
- 最后,为该window注册失效后的回调函数,在window失效后,删除window并做其他收尾工作。
registerCleanupTimer(window);
前面说过了, WindowOperator 实现了 Triggerable 接口,且有 triggerContext 获取当前正在处理的window的trigger来注册回调函数,registerCleanupTimer(window)
方法如下:
protected void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
}
if (windowAssigner.isEventTime()) {
triggerContext.registerEventTimeTimer(cleanupTime);
} else {
triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
public void registerEventTimeTimer(long time) {
internalTimerService.registerEventTimeTimer(window, time);
}
通过上面的两个方法可以看出,这里的回调函数并不是注册在当前window的trigger中,而是注册在 WindowOperator 内部持有的一个 internalTimerService 中。
那该window是在何时才会失效呢?
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
window 在 watermark 的时间戳大于 maxTimestamp + allowedLateness 时,才会过期,这便是 flink 提供的除了 watermark 外的另一种处理迟到数据的机制。
我们再看看,window过期后,回调函数是怎么处理的。
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
windowState.setCurrentNamespace(stateWindow);
}
} else {
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
ACC contents = null;
if (windowState != null) {
contents = windowState.get();
}
if (contents != null) {
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
上面与 MergingWindowAssigner 相关的分支我们不进入分析。
为了方便分析,我们假设 window 的过期时间 maxTimestamp + allowedLateness = 2000 + 1500 = 3500。 当 watermark 的时间戳大于 3500 时,便会触发该回调函数,为了说明普遍性,假设 watermark 的时间戳为 4000。
将与 MergingWindowAssigner 无关的语句去掉后,该方法的前面部分如下:
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
ACC contents = null;
if (windowState != null) {
contents = windowState.get();
}
首先,将 triggerContext 根据key与window找到特定的trigger,同样 windowState 根据window找到特定的window中的context,该context中存储的是window的计算结果(Incremental Aggregation Functions 情况下)或者缓存的数据(ProcessWindwoFunction 情况下)。
接下来:
if (contents != null) {
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
以上代码说明了当window过期,触发过期的回到函数时,都会做哪些操作。
可以看到,会先到该window的trigger中执行 onEventTme 方法。此时的 timer.getTimestamp() 的值为 3500。而一般我们自定义的trigger中一般不会注册一个时间为 maxTimestamp + allowedLateness 的回调函数。以flink的默认trigger - EventTimeTrigger 为例,其注册的回调函数最大时间便是 maxTimestamp 。因此,除非用户设置的 allowedLateness 为0,且在trigger中注册了时间为 maxTmestamp 的回调函数,否则此处不会有triggerResult。
假设此处确实有对应的回调函数且被执行,下面的两个if条件的逻辑与上面分析 processElement 时一样,不再赘述。
再往下走,调用了 clearAllState 方法,进入该方法:
private void clearAllState(W window,AppendingState<IN, ACC> windowState,MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
triggerContext.clear();
processContext.window = window;
processContext.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
}
}
- windowState.clear() 将window中暂存的结果或原始数据删除。
- triggerContext.clear() 调用该window的trigger的clear()方法,删除用户自定义trigger中的自定义状态,同时删除trigger的timer。需要用户在实现自定义trigger且使用自定义状态时,实现该方法,方便此时调用清除状态,避免内存问题。
- processContext.clear(); 调用用户实现自定义逻辑的,ProcessWindwoFuncton接口实现类的clear()方法,目的同上。