我们知道flink已经封装了很多高级的api供用户访问使用,但是有时候我们可能根据不同的需求,发现提供的高级api不能满足我们的需求,这个时候flink也为我们提供了low-level层面的api,比如processFunction,通过processFunction函数,我们可以访问state,进行注册process ,event time定时器来帮助我们完成一项复杂的操作。在我们使用process 函数的时候,有一个前提就是要求我们必须使用在keyedStream上,有两个原因,一个是getRuntimeContext 得到的StreamingRuntimeContext 只提供了KeyedStateStore的访问权限,所以只能访问keyd state,另外一个是我们在注册定时器的时候,需要有三个维度,namespace,key,time,所以要求我们有key,这就是在ProcessFunction中只能在keyedStream做定时器注册,在flink1.8.0版本中,有ProcessFunction 和KeyedProcessFunction 这个类面向用户的api,但是在ProcessFunction 类我们无法注册定时器,在ProcessOperator源码中我们发现注册是抛出异常
为什么KeyedProcessFunction可以调用RuntimeContext对象,通过源码看一下
KeyedProcessFunction是一个抽象类,继承了AbstractRichFunction抽象类
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
进入AbstractRichFunction类,可以看到,该类实现了实现了RichFunction,和Serializable接口
RichFunction中定义了getRuntimeContext方法,在AbstractRichFunction中实现了该方法
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable
我们调用getRuntimeContext方法时,便可以获取RuntimeContext对象,对状态等进行操作
private transient RuntimeContext runtimeContext;
@Override
public void setRuntimeContext(RuntimeContext t) {
this.runtimeContext = t;
}
@Override
public RuntimeContext getRuntimeContext() {
if (this.runtimeContext != null) {
return this.runtimeContext;
} else {
throw new IllegalStateException("The runtime context has not been initialized.");
}
}
现在开始看process算子的实现
process算子需要传入的值,传入值分为两种processFunc和KeyedProcessFunc,但不建议使用ProcessFunction了,建议使用KeyedProcessFunction,所以主要看KeyedProcessFunction
数据流在经过keyBy之后会转换成KeyedStream,先看一下KeyStream中的procss方法
KeyedStream是DataStream的实现
public class KeyedStream<T, KEY> extends DataStream<T>
可以看到process需要传入一个keyedProcessFunction (用编写的),如果用户不指定输出类型,会获取默认类型
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
可以看出将函数封装成了一个KeyedProcessOperator类型,这个类继承了AbstractUdfStreamOperator类和实现了OneInputStreamOperato接口和Triggerable接口
public class KeyedProcessOperator<K, IN, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace>
该类重写了 父类的open方法,实现了AbstractUdfStreamOperator的processElement方法和Triggerable的onEventTime和onProcessingTime方法, 现在看一下实现的逻辑
open方法
在方法中,首先调用父类Open方法进行初始化操作, 然后初始化本类服务,
@Override
public void open() throws Exception {
//调用父类open方法 进行初始化
super.open();
//创建一个 timestampedCollector 来给定Flink output ----英翻 时间戳收集器
collector = new TimestampedCollector<>(output);
//定义内部定时服务
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
//internalTimerService 封装到 TimeService中
//获取timerSErvice SimpleTimerService 内部使用了 internalTimerService
TimerService timerService = new SimpleTimerService(internalTimerService);
//传入 userFun 和 定时器 返回context对象
context = new ContextImpl(userFunction, timerService);
//同上 返回定时器onTimerContext对象
onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}
这里重要的是这行代码context = new ContextImpl(userFunction, timerService);
现在看下他的内部实现, 这个是内部类,他继承了KeyedProcessFunction的Context类
在该类中实现了Countext对象,对我们提供上下文服务
private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
private final TimerService timerService;
private StreamRecord<IN> element;
ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}
@Override
public Long timestamp() {
checkState(element != null);
if (element.hasTimestamp()) {
return element.getTimestamp();
} else {
return null;
}
}
@Override
public TimerService timerService() {
return timerService;
}
@Override
public <X> void output(OutputTag<X> outputTag, X value) {
if (outputTag == null) {
throw new IllegalArgumentException("OutputTag must not be null.");
}
output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}
@Override
@SuppressWarnings("unchecked")
public K getCurrentKey() {
return (K) KeyedProcessOperator.this.getCurrentKey();
}
}
在看一下 processElement 方法,主要调用用户逻辑
这里userFunc调用processElement方法,该方法为用户定义的内容
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
//赋值element
context.element = element;
//将context对象 和collector 传入 userFunc中
//为用户层级提供了访问时间和注册定时器的入口
userFunction.processElement(element.getValue(), context, collector);
//赋值调用完后 清空
context.element = null;
}
当用户通过ctx.timerService().registerProcessingTimeTimer(); 设置定时器后,定时器触发会走KeyedProcessOperator的onEventTime或onProcessingTime方法 这里看下onEventTime的实现
在EventTime计时器触发时调用,在方法中 调用了 invokeUserFunction方法
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
我们跟随invokeUserFunction进入方法 看下实现,这个方法会调用 用户的onTime方法,执行里面逻辑
private void invokeUserFunction(
TimeDomain timeDomain,
InternalTimer<K, VoidNamespace> timer) throws Exception {
onTimerContext.timeDomain = timeDomain;
onTimerContext.timer = timer;
//这个时候也就是调用了我们自定义类K\eyedProcessFunction中的onTimer,
//调用时传入了OnTimerContextImpl对象,其持有IntervalTimeService服务,也可以注册定时器操作。
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
最终 将用户的Func 包装成KeyedProcessOperator对象 调用transform方法,最终返回转换后的DataStream
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
现在我们追踪进去看,最终调用了doTransform方法,经过一系列的转换,将将operator添加到拓补图中,最终将operator转换成SingleOutputStreamOperator对象,该类继承DataStream,进行返回
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 检查输出类型是否为MissingTypeInfo,如果是抛出异常,
transformation.getOutputType();
//创建OneInputTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
transformation, //input --上游的 transformation
operatorName,
operatorFactory, //需要进行转换操作的
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//多个级联的map和filter操作会被transform成为一连串的OneInputTransformation。
// 后一个transformation的input指向前一个transformation
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
到此整个process算子调用完成
如有错误,欢迎指正!