Flink本身提供了多层API,前面介绍的DataStream API只是其中的一环。
在前面的章节介绍了诸多Flink提供的算子(如map、filter、widow等)。除了使用这些已有的算子,我们也可以自定义算子,也即本章的处理函数(process function)。
基本处理函数(ProcessFunction)
之前讲的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限,如map算子,只能获取到当前的数据。而窗口聚合算子AggregateFuntion以及富函数(如RichMapFunction),可以拿到上下文状态等信息。但是无论那种算子,想要访问时间戳,或者当前水位线信息,都是完全做不到的,这时就需要处理函数(ProcessFunction)。
处理函数提供了一个“定时服务”(TimeService),我们可以通过它访问流中的时间(event),时间戳(timestamp),水位线(Watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数汉可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个DataStream API的最底层基础。
处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用process()
方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。stream.process(new MyProcessFunction)
。
一个梨子:
package com.whu.chapter07
import com.whu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.functions._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object ProcessFunctionDemo {
def main(args:Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(new ClickSource())
.assignAscendingTimestamps(_.timeStamp)
.process(new ProcessFunction[Event, String] {
// 每来一条元素都会调用一次
override def processElement(i:Event, context:ProcessFunction[Event,String]#Context, collector: Collector[String]):Unit = {
if(i.user.equals("Mary")){
// 向下游发送数据
collector.collect(i.user)
}else if(i.user.equals("bob")){
collector.collect(i.user)
}
// 打印当前水位线
println(context.timerService.currentWatermark())
}
})
.print()
env.execute()
}
}
ProcessFunction解析:
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
...
public abstract void processElement(I value, Context ctx, Collector<O> out)
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
throws Exception {}
...
}
processElement方法用于处理原始,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及收集器(Collector)out。方法没有返回值,处理之后的输出数据通过收集器的out定义。
- value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致;
- ctx: 类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法output();
- out: “收集器”(类型为Collector),用于返回输出数据。使用方式与flatMap()算子中的收集器完全一样,直接调用out.collect()方法就可以像下游发出一个数据。这个方法可多次调用,也可以不调用。
onTimer用于定时触发的操作,这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的 timestamp 是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。
在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中我们并没有使用定时器。
处理函数的分类
Flink提供了8个不同的处理函数:
- ProcessFunction,最基本的处理函数,基于 DataStream 直接调用 process()时作为参数传入;
- KeyedProcessFunction,对流按键分区后的处理函数,基于 KeyedStream 调用 process()时作为参数传入。要想使用定时器,必须基于 KeyedStream;
- ProcessWindowFunction,开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process()时作为参数传入;
- ProcessAllWindowFunction,同样是开窗之后的处理函数,基于 AllWindowedStream 调用 process()时作为参数传入;
- CoProcessFunction,合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process()时作为参数传入;
- ProcessJoinFunction,间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process()时作为参数传入;
- BroadcastProcessFunction,广播连接流处理函数,基于 BroadcastConnectedStream 调用 process()时作为参数传入;
- KeyedBroadcastProcessFunction,按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用 process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream
与广播流(BroadcastStream)做连接之后的产物。
按键分区处理函数(KeyedProcessFunction)
在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy()算子对数据流进行“按键分区”,得到一个 KeyedStream。而只有在 KeyedStream 中,才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy()分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction。
package com.whu.chapter07
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import com.whu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import java.sql.Timestamp
object KeyedProcessFunctionDemo {
def main(args: Array[String]):Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(new ClickSource())
.keyBy(r=>true)
.process(new KeyedProcessFunction[Boolean, Event, String]{
override def processElement(i: Event, context: KeyedProcessFunction[Boolean, Event, String]#Context, collector: Collector[String]): Unit = {
val currTs = context.timerService.currentProcessingTime()
collector.collect("数据到达时间:"+new Timestamp(currTs))
// 注册10秒钟之后的处理时间定时器
context.timerService().registerProcessingTimeTimer(currTs+10*1000L)
}
// 定时器逻辑
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("定时器触发时间:"+new Timestamp(timestamp))
}
}).print()
env.execute()
}
}
窗口处理函数
窗 口 处 理 函 数 ProcessWindowFunction 的 使 用 与 其 他 窗 口 函 数 类 似 , 也 是 基 于WindowedStream 直接调用方法就可以,只不过这时调用的是 process()。
ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
...
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
public void clear(Context context) throws Exception {}
public abstract class Context implements java.io.Serializable {...}
}
其有四个类型的参数:
- IN: input, 数据流中窗口任务的输入数据类型;
- OUT: output, 窗口任务进行计算之后的输出数据类型;
- KEY: 数据中key的类型;
- W:窗口的类型,是window的子类型,一般情况下我们定义时间窗口,w就是TimeWindow;
由于全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是 processElement(),
而是改成了 process()。方法包含四个参数:
- key:窗口做统计计算基于的键,也就是之前keyBy()用来区分的字段;
- context:当前窗口进行计算的上下文,他的类型就是ProcessWindowFunction内部定义的抽象类context;
- elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型;
- out:用来发送数据输出计算结果的收集器,类型为Collector;
一个计算网页浏览量topN的例子:
package com.whu.chapter07
import org.apache.flink.streaming.api.functions._
import com.whu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
import java.sql.Timestamp
import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable.ListBuffer
object ProcessWindowFunctionDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new ClickSource())
.assignAscendingTimestamps(_.timeStamp)
// 只需要url就可以统计数量,所以抽取url转换成String,直接开窗统计
stream.map(_.url)
// 开窗
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
// 初始化一个map,key为url,value为url的pv数据
val urlCountMap = Map[String, Long]()
// 将url和pv数据写入Map中
elements.foreach(r=>urlCountMap.get(r) match {
case Some(count) => urlCountMap.put(r, count+1L)
case None=> urlCountMap.put(r, 1L)
})
// 将Map中的键值对转换成列表数据结构
// 列表中的元素是(K,V)元组
var mapList = new ListBuffer[(String, Long)]()
urlCountMap.keys.foreach(key=>
urlCountMap.get(key) match {
case Some(count) => mapList+=((key, count))
case None => mapList
}
)
// 按照浏览量进行降序排列
mapList.sortBy(-_._2)
// 输出
val result = new StringBuilder
result.append("=======\n")
for(i <- 0 to 1){
val temp = mapList(i)
result.append("浏览量No."+(i+1)+" ")
.append("url:"+temp._1+" ")
.append("浏览量No."+(i+2)+" ")
.append("url:"+temp._2+" ")
.append("窗口结束时间是:"+ new Timestamp((context.window.getEnd))+"\n")
}
}
})
.print()
env.execute()
}
}