经作者magina本人同意,转载自原文
在flink中无论是windowOperator还是KeyedProcessOperator都持有InternalTimerService具体实现的对象,通过这个对象用户可以注册EventTime及ProcessTime的timer,当watermark 越过这些timer的时候,调用回调函数执行一定的操作。这里着重看下KeyedProcessOperator(windowOperator机制大致相同,这里就不再细说)。
当StreamTask 被调度执行的时候,具体生命周期如
* -- invoke()
* |
* +----> Create basic utils (config, etc) and load the chain of operators
* +----> operators.setup()
* +----> task specific init()
* +----> initialize-operator-states()
* +----> open-operators()
* +----> run()
* +----> close-operators()
* +----> dispose-operators()
* +----> common cleanup
* +----> task specific cleanup()
在KeyedProcessOperator的open方法将在streamTask open-operators()阶段被调用
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
//为该Operator构造InternalTimerService并启动,通过该InternalTimerService可以访问时间
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
context = new ContextImpl(userFunction, timerService);
onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}
然后stream task调用run()启动计算
@Override
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
//在run方法中通过inputProcessor来从input gate里面读取消息,消息可以是正常的数据,也可以是watermark
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
}
// the input is finished, notify non-head operators
if (running) {
synchronized (getCheckpointLock()) {
OneInputStreamOperator<IN, OUT> headOperator = getHeadOperator();
for (StreamOperator<?> operator : operatorChain.getAllOperatorsTopologySorted()) {
if (operator.getOperatorID().equals(headOperator.getOperatorID())) {
continue;
}
Preconditions.checkState(operator instanceof OneInputStreamOperator);
((OneInputStreamOperator<?, ?>) operator).endInput();
}
}
}
}
在StreamInputProcessor的processInput()方法中
else {
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
//正常数据处理,最终会调用用户实现的userfunction的processElement,对于KeyedProcessOperator就是调用用户定义keyedProcessFunction的processElement
streamOperator.processElement(record);
}
}
return true;
} else if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);//处理watermark
continue;
} else if (recordOrMark.isStreamStatus()) {
下面就看下watermark的处理过程,最终会调用到AbstractStreamOperator的processWatermark方法
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);//第一步处理watermark
}
output.emitWatermark(mark);//第二步,将watermark发送到下游
}
那么是怎么处理watermark的呢?接着看InternalTimeServiceManager的advanceWatermark方法
public void advanceWatermark(Watermark watermark) throws Exception {
//这里之前调用getInternalTimerService构建的的InternalTimerService都要处理该watermark
for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
}
}
接着看HeapInternalTimerService我们可以发现,这里逻辑timer时间小于watermark的都应该被触发回调,
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;//更新当前watermark
InternalTimer<K, N> timer;
//取出所有低于watermark的timer触发回调。
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
timerSet.remove(timer);
eventTimeTimersQueue.remove();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);//这里的triggerTarget就是具体的operator对象,
}
}
这里triggerTarget就是具体的operator实例,在open的时候通过InternalTimeServiceManager的getInternalTimerService方法传递到HeapInternalTimerService
接着看KeyedProcessOperator的onEeventTime,这里就是调用用户实现的keyedProcessFunction的onTimer去做一些具体的事情。对于window来说也是调用onEventTime或者onProcessTime来从key和window對應的状态中的数据发送到windowFunction中去计算并发送到下游节点
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
private void invokeUserFunction(
TimeDomain timeDomain,
InternalTimer<K, VoidNamespace> timer) throws Exception {
onTimerContext.timeDomain = timeDomain;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);//这里就是前面用户实现的onTimer方法
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
前面讲的是watermark是怎么被触发的,但是还有另外一个问题,timer是如何注册的。
windowOperator和KeyedProcessOperator直接或者间接持有timerService,通过timerService对象就可以注册相应的timer
/**
* Interface for working with time and timers.
*/
@PublicEvolving
public interface TimerService {
/** Returns the current processing time. */
long currentProcessingTime();
/** Returns the current event-time watermark. */
long currentWatermark();
/**
* Registers a timer to be fired when processing time passes the given time.
*
* <p>Timers can internally be scoped to keys and/or windows. When you set a timer
* in a keyed context, such as in an operation on
* {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
* will also be active when you receive the timer notification.
*/
void registerProcessingTimeTimer(long time);
/**
* Registers a timer to be fired when the event time watermark passes the given time.
*
* <p>Timers can internally be scoped to keys and/or windows. When you set a timer
* in a keyed context, such as in an operation on
* {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
* will also be active when you receive the timer notification.
*/
void registerEventTimeTimer(long time);
}
对于KeyedProcessOperator来说会将timeService对象间接的传递到keyedProcessFunction,使用户在函数层面就能注册和访问timer。这里需要注意的有两点
1.namespace相同的情况下,每一个key只有1个timer。
2.如果TimeCharacteristic为processTime,当需要注册timer时间小于当前系统处理时间会立即出发回调